Avoid updating users when the user id itself changed

This commit is contained in:
Natsu Kagami 2024-08-04 21:23:35 +02:00 committed by Natsu Kagami
parent fad84b9420
commit 5c523009e1
5 changed files with 73 additions and 22 deletions

View file

@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT id as \"id: i64\" FROM osu_users WHERE user_id = ?",
"describe": {
"columns": [
{
"name": "id: i64",
"ordinal": 0,
"type_info": "Int64"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false
]
},
"hash": "9c620436cfe463780fca2dc5497c3b787fa284a74b3e3d491755c88a693810e0"
}

View file

@ -182,7 +182,22 @@ impl OsuUser {
impl OsuUser {
/// Stores the user.
pub async fn store<'a>(&self, conn: &mut Transaction<'a, Database>) -> Result<()> {
pub async fn store<'a>(&self, conn: &mut Transaction<'a, Database>) -> Result<bool> {
let old_user_id = {
query!(
r#"SELECT id as "id: i64" FROM osu_users WHERE user_id = ?"#,
self.user_id
)
.fetch_optional(&mut **conn)
.await?
.map(|v| v.id)
};
if old_user_id.is_some_and(|v| v != self.id) {
// There's another update that changed the user_id
return Ok(false);
}
query!(
r#"INSERT
INTO osu_users(user_id, username, id, failures)
@ -221,7 +236,7 @@ impl OsuUser {
.execute(&mut **conn)
.await?;
}
Ok(())
Ok(true)
}
pub async fn delete(user_id: i64, conn: impl Executor<'_, Database = Database>) -> Result<()> {

View file

@ -1,5 +1,7 @@
use chrono::{DateTime, Utc};
use future::Future;
use futures_util::try_join;
use std::pin::Pin;
use std::sync::Arc;
use stream::FuturesUnordered;
@ -57,17 +59,11 @@ impl youmubot_prelude::Announcer for Announcer {
let users = env.saved_users.all().await?;
users
.into_iter()
.map(
|osu_user| {
channels
.channels_of(ctx.clone(), osu_user.user_id)
.then(|chs| self.update_user(ctx.clone(), &env, osu_user, chs))
.then(|new_user| env.saved_users.save(new_user))
.map(|r| {
r.pls_ok();
})
}, // self.update_user()
)
.map(|osu_user| {
channels
.channels_of(ctx.clone(), osu_user.user_id)
.then(|chs| self.update_user(ctx.clone(), &env, osu_user, chs))
})
.collect::<stream::FuturesUnordered<_>>()
.collect::<()>()
.await;
@ -82,13 +78,17 @@ impl Announcer {
env: &OsuEnv,
mut user: OsuUser,
broadcast_to: Vec<ChannelId>,
) -> OsuUser {
) {
if broadcast_to.is_empty() {
return; // Skip update if there are no broadcasting channels
}
if user.failures == MAX_FAILURES {
return user;
return;
}
const MODES: [Mode; 4] = [Mode::Std, Mode::Taiko, Mode::Catch, Mode::Mania];
let now = chrono::Utc::now();
let broadcast_to = Arc::new(broadcast_to);
let mut to_announce = Vec::<Pin<Box<dyn Future<Output = ()> + Send>>>::new();
for mode in MODES {
let (u, top, events) = match self.fetch_user_data(env, now, &user, mode).await {
Ok(v) => v,
@ -125,7 +125,7 @@ impl Announcer {
let ctx = ctx.clone();
let env = env.clone();
if let Some(last) = last {
spawn_future(async move {
to_announce.push(Box::pin(async move {
let top = top
.into_iter()
.enumerate()
@ -147,11 +147,18 @@ impl Announcer {
.filter_map(|v| future::ready(v.pls_ok().map(|_| ())))
.collect::<()>()
.await
});
}));
}
}
user.failures = 0;
user
let user_id = user.user_id;
if let Some(true) = env.saved_users.save(user).await.pls_ok() {
to_announce.into_iter().for_each(|v| {
spawn_future(v);
});
} else {
eprintln!("[osu] Skipping user {} due to raced update", user_id)
}
}
fn is_announceable_date(

View file

@ -46,18 +46,21 @@ impl OsuSavedUsers {
}
/// Save the given user.
pub async fn save(&self, u: OsuUser) -> Result<()> {
pub async fn save(&self, u: OsuUser) -> Result<bool> {
let mut tx = self.pool.begin().await?;
model::OsuUser::from(u).store(&mut tx).await?;
let updated = model::OsuUser::from(u).store(&mut tx).await?;
tx.commit().await?;
Ok(())
Ok(updated)
}
/// Save the given user as a completely new user.
pub async fn new_user(&self, u: OsuUser) -> Result<()> {
let mut t = self.pool.begin().await?;
model::OsuUser::delete(u.user_id.get() as i64, &mut *t).await?;
model::OsuUser::from(u).store(&mut t).await?;
assert!(
model::OsuUser::from(u).store(&mut t).await?,
"Should be updated"
);
t.commit().await?;
Ok(())
}

View file

@ -88,6 +88,8 @@ pub mod prelude_commands {
}
mod debugging_ok {
use std::backtrace::{Backtrace, BacktraceStatus};
pub trait OkPrint {
type Output;
fn pls_ok(self) -> Option<Self::Output>;
@ -101,6 +103,10 @@ mod debugging_ok {
Ok(v) => Some(v),
Err(e) => {
eprintln!("Error: {:?}", e);
let captures = Backtrace::capture();
if captures.status() == BacktraceStatus::Captured {
eprintln!("{}", captures);
}
None
}
}