From c18d8f48d057f0a381b4259b3368d944b679a98a Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Mon, 7 Sep 2020 16:19:00 -0400 Subject: [PATCH] osu: Asyncify announcer --- youmubot-osu/src/discord/announcer.rs | 140 +++++++++++++++--------- youmubot-osu/src/discord/oppai_cache.rs | 2 +- 2 files changed, 87 insertions(+), 55 deletions(-) diff --git a/youmubot-osu/src/discord/announcer.rs b/youmubot-osu/src/discord/announcer.rs index 4ce0c13..a495ac3 100644 --- a/youmubot-osu/src/discord/announcer.rs +++ b/youmubot-osu/src/discord/announcer.rs @@ -9,14 +9,12 @@ use crate::{ Client as Osu, }; use announcer::MemberToChannels; -use rayon::prelude::*; use serenity::{ - framework::standard::{CommandError as Error, CommandResult}, http::CacheHttp, model::id::{ChannelId, UserId}, CacheAndHttp, }; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use youmubot_prelude::*; /// osu! announcer's unique announcer key. @@ -33,44 +31,64 @@ impl youmubot_prelude::Announcer for Announcer { d: AppData, channels: MemberToChannels, ) -> Result<()> { - let d = d.read().await; - let osu = d.get::().unwrap(); - let cache = d.get::().unwrap(); - let oppai = d.get::().unwrap(); // For each user... - let mut data = OsuSavedUsers::open(&*d).borrow()?.clone(); - for (user_id, osu_user) in data.iter_mut() { - let channels = channels.channels_of(c.clone(), *user_id).await; - if channels.is_empty() { - continue; // We don't wanna update an user without any active server - } - osu_user.pp = match (&[Mode::Std, Mode::Taiko, Mode::Catch, Mode::Mania]) - .par_iter() - .map(|m| { - handle_user_mode( - c.clone(), - &osu, - &cache, - &oppai, - &osu_user, - *user_id, - &channels[..], - *m, - &*d, + let data = OsuSavedUsers::open(&*d.read().await).borrow()?.clone(); + let data = data + .into_iter() + .map(|(user_id, osu_user)| { + let d = d.clone(); + let channels = &channels; + let c = c.clone(); + async move { + let d = d.read().await; + let osu = d.get::().unwrap(); + let cache = d.get::().unwrap(); + let oppai = d.get::().unwrap(); + let channels = channels.channels_of(c.clone(), user_id).await; + if channels.is_empty() { + return (user_id, osu_user); // We don't wanna update an user without any active server + } + let pp = match (&[Mode::Std, Mode::Taiko, Mode::Catch, Mode::Mania]) + .into_iter() + .map(|m| { + handle_user_mode( + c.clone(), + &osu, + &cache, + &oppai, + &osu_user, + user_id, + &channels[..], + *m, + &*d, + ) + }) + .collect::>() + .try_collect::>() + .await + { + Ok(v) => v, + Err(e) => { + eprintln!("osu: Cannot update {}: {}", osu_user.id, e); + return (user_id, osu_user); + } + }; + let last_update = chrono::Utc::now(); + ( + user_id, + OsuUser { + pp, + last_update, + ..osu_user + }, ) - }) - .collect::>() - { - Ok(v) => v, - Err(e) => { - eprintln!("osu: Cannot update {}: {}", osu_user.id, e.0); - continue; } - }; - osu_user.last_update = chrono::Utc::now(); - } + }) + .collect::>() + .collect::>() + .await; // Update users - *OsuSavedUsers::open(&*d.read()).borrow_mut()? = data; + *OsuSavedUsers::open(&*d.read().await).borrow_mut()? = data; Ok(()) } } @@ -87,35 +105,49 @@ async fn handle_user_mode( mode: Mode, d: &TypeMap, ) -> Result, Error> { - let scores = scan_user(osu, osu_user, mode)?; + let scores = scan_user(osu, osu_user, mode).await?; let user = osu .user(UserID::ID(osu_user.id), |f| f.mode(mode)) .await? - .ok_or(Error::from("user not found"))?; + .ok_or(Error::msg("user not found"))?; scores .into_iter() - .map(|(rank, score)| -> Result<_> { - let beatmap = cache.get_beatmap_default(score.beatmap_id)?; - let content = oppai.get_beatmap(beatmap.beatmap_id)?; - Ok((rank, score, BeatmapWithMode(beatmap, mode), content)) + .map(|(rank, score)| async move { + let beatmap = cache.get_beatmap_default(score.beatmap_id).await?; + let content = oppai.get_beatmap(beatmap.beatmap_id).await?; + let r: Result<_> = Ok((rank, score, BeatmapWithMode(beatmap, mode), content)); + r }) - .filter_map(|v| v.ok()) + .collect::>() + .filter_map(|v| future::ready(v.ok())) .for_each(|(rank, score, beatmap, content)| { - for channel in (&channels).iter() { - if let Err(e) = channel.send_message(c.http(), |c| { - c.content(format!("New top record from {}!", user_id.mention())) - .embed(|e| score_embed(&score, &beatmap, &content, &user, Some(rank), e)) - }) { - dbg!(e); + let c = c.clone(); + let user = &user; + async move { + for channel in (&channels).iter() { + if let Err(e) = channel + .send_message(c.http(), |c| { + c.content(format!("New top record from {}!", user_id.mention())) + .embed(|e| { + score_embed(&score, &beatmap, &content, &user, Some(rank), e) + }) + }) + .await + { + dbg!(e); + } + save_beatmap(d, *channel, &beatmap).ok(); } - save_beatmap(&*d.read(), *channel, &beatmap).ok(); } - }); + }) + .await; Ok(user.pp) } -fn scan_user(osu: &Osu, u: &OsuUser, mode: Mode) -> Result, Error> { - let scores = osu.user_best(UserID::ID(u.id), |f| f.mode(mode).limit(25))?; +async fn scan_user(osu: &Osu, u: &OsuUser, mode: Mode) -> Result, Error> { + let scores = osu + .user_best(UserID::ID(u.id), |f| f.mode(mode).limit(25)) + .await?; let scores = scores .into_iter() .enumerate() diff --git a/youmubot-osu/src/discord/oppai_cache.rs b/youmubot-osu/src/discord/oppai_cache.rs index c45ed3a..fbb6228 100644 --- a/youmubot-osu/src/discord/oppai_cache.rs +++ b/youmubot-osu/src/discord/oppai_cache.rs @@ -89,7 +89,7 @@ impl BeatmapCache { pub async fn get_beatmap<'a>( &'a self, id: u64, - ) -> Result + 'a, CommandError> { + ) -> Result + 'a> { if !self.cache.contains_key(&id) { self.cache.insert(id, self.download_beatmap(id).await?); }