Parallelize and modularize user scanning

This commit is contained in:
Natsu Kagami 2020-06-06 21:53:23 -04:00
parent 33637c9a7b
commit eec58fc6e4
Signed by: nki
GPG key ID: 73376E117CD20735

View file

@ -10,6 +10,7 @@ use rayon::prelude::*;
use serenity::{ use serenity::{
framework::standard::{CommandError as Error, CommandResult}, framework::standard::{CommandError as Error, CommandResult},
http::CacheHttp, http::CacheHttp,
model::id::{ChannelId, UserId},
CacheAndHttp, CacheAndHttp,
}; };
use std::sync::Arc; use std::sync::Arc;
@ -23,36 +24,58 @@ pub fn updates(c: Arc<CacheAndHttp>, d: AppData, channels: MemberToChannels) ->
let osu = d.get_cloned::<OsuClient>(); let osu = d.get_cloned::<OsuClient>();
// For each user... // For each user...
let mut data = OsuSavedUsers::open(&*d.read()).borrow()?.clone(); let mut data = OsuSavedUsers::open(&*d.read()).borrow()?.clone();
'user_loop: for (user_id, osu_user) in data.iter_mut() { for (user_id, osu_user) in data.iter_mut() {
let mut pp_values = vec![]; // Store the pp values here... let channels = channels.channels_of(c.clone(), *user_id);
for mode in &[Mode::Std, Mode::Taiko, Mode::Catch, Mode::Mania] { if channels.is_empty() {
let scores = scan_user(&osu, osu_user, *mode)?; continue; // We don't wanna update an user without any active server
let user = match osu.user(UserID::ID(osu_user.id), |f| f.mode(*mode)) { }
Ok(Some(u)) => u, osu_user.pp = match (&[Mode::Std, Mode::Taiko, Mode::Catch, Mode::Mania])
_ => continue 'user_loop, .par_iter()
}; .map(|m| handle_user_mode(c.clone(), &osu, &osu_user, *user_id, &channels[..], *m))
pp_values.push(user.pp); .collect::<Result<_, _>>()
if scores.is_empty() && !osu_user.pp.is_empty() { {
// Nothing to update: no new scores and pp is there. Ok(v) => v,
Err(e) => {
eprintln!("osu: Cannot update {}: {}", osu_user.id, e.0);
continue; continue;
} }
};
osu_user.last_update = chrono::Utc::now();
}
// Update users
*OsuSavedUsers::open(&*d.read()).borrow_mut()? = data;
Ok(())
}
/// Handles an user/mode scan, announces all possible new scores, return the new pp value.
fn handle_user_mode(
c: Arc<CacheAndHttp>,
osu: &Osu,
osu_user: &OsuUser,
user_id: UserId,
channels: &[ChannelId],
mode: Mode,
) -> Result<Option<f64>, Error> {
let scores = scan_user(osu, osu_user, mode)?;
let user = osu
.user(UserID::ID(osu_user.id), |f| f.mode(mode))?
.ok_or(Error::from("user not found"))?;
scores scores
.into_par_iter() .into_par_iter()
.filter_map(|(rank, score)| { .filter_map(|(rank, score)| {
let beatmap = osu let beatmap = osu
.beatmaps(BeatmapRequestKind::Beatmap(score.beatmap_id), |f| f) .beatmaps(BeatmapRequestKind::Beatmap(score.beatmap_id), |f| f)
.map(|v| BeatmapWithMode(v.into_iter().next().unwrap(), *mode)); .map(|v| BeatmapWithMode(v.into_iter().next().unwrap(), mode));
let channels = channels.channels_of(c.clone(), *user_id);
match beatmap { match beatmap {
Ok(v) => Some((rank, score, v, channels)), Ok(v) => Some((rank, score, v)),
Err(e) => { Err(e) => {
dbg!(e); dbg!(e);
None None
} }
} }
}) })
.for_each(|(rank, score, beatmap, channels)| { .for_each(|(rank, score, beatmap)| {
for channel in channels { for channel in (&channels).iter() {
if let Err(e) = channel.send_message(c.http(), |c| { if let Err(e) = channel.send_message(c.http(), |c| {
c.content(format!("New top record from {}!", user_id.mention())) c.content(format!("New top record from {}!", user_id.mention()))
.embed(|e| score_embed(&score, &beatmap, &user, Some(rank), e)) .embed(|e| score_embed(&score, &beatmap, &user, Some(rank), e))
@ -61,13 +84,7 @@ pub fn updates(c: Arc<CacheAndHttp>, d: AppData, channels: MemberToChannels) ->
} }
} }
}); });
} Ok(user.pp)
osu_user.last_update = chrono::Utc::now();
osu_user.pp = pp_values;
}
// Update users
*OsuSavedUsers::open(&*d.read()).borrow_mut()? = data;
Ok(())
} }
fn scan_user(osu: &Osu, u: &OsuUser, mode: Mode) -> Result<Vec<(u8, Score)>, Error> { fn scan_user(osu: &Osu, u: &OsuUser, mode: Mode) -> Result<Vec<(u8, Score)>, Error> {