osu: Asyncify announcer

This commit is contained in:
Natsu Kagami 2020-09-07 16:19:00 -04:00
parent bfe12d3946
commit c18d8f48d0
Signed by: nki
GPG key ID: 73376E117CD20735
2 changed files with 87 additions and 55 deletions

View file

@ -9,14 +9,12 @@ use crate::{
Client as Osu, Client as Osu,
}; };
use announcer::MemberToChannels; use announcer::MemberToChannels;
use rayon::prelude::*;
use serenity::{ use serenity::{
framework::standard::{CommandError as Error, CommandResult},
http::CacheHttp, http::CacheHttp,
model::id::{ChannelId, UserId}, model::id::{ChannelId, UserId},
CacheAndHttp, CacheAndHttp,
}; };
use std::sync::Arc; use std::{collections::HashMap, sync::Arc};
use youmubot_prelude::*; use youmubot_prelude::*;
/// osu! announcer's unique announcer key. /// osu! announcer's unique announcer key.
@ -33,19 +31,25 @@ impl youmubot_prelude::Announcer for Announcer {
d: AppData, d: AppData,
channels: MemberToChannels, channels: MemberToChannels,
) -> Result<()> { ) -> Result<()> {
// For each user...
let data = OsuSavedUsers::open(&*d.read().await).borrow()?.clone();
let data = data
.into_iter()
.map(|(user_id, osu_user)| {
let d = d.clone();
let channels = &channels;
let c = c.clone();
async move {
let d = d.read().await; let d = d.read().await;
let osu = d.get::<OsuClient>().unwrap(); let osu = d.get::<OsuClient>().unwrap();
let cache = d.get::<BeatmapMetaCache>().unwrap(); let cache = d.get::<BeatmapMetaCache>().unwrap();
let oppai = d.get::<BeatmapCache>().unwrap(); let oppai = d.get::<BeatmapCache>().unwrap();
// For each user... let channels = channels.channels_of(c.clone(), user_id).await;
let mut data = OsuSavedUsers::open(&*d).borrow()?.clone();
for (user_id, osu_user) in data.iter_mut() {
let channels = channels.channels_of(c.clone(), *user_id).await;
if channels.is_empty() { if channels.is_empty() {
continue; // We don't wanna update an user without any active server return (user_id, osu_user); // We don't wanna update an user without any active server
} }
osu_user.pp = match (&[Mode::Std, Mode::Taiko, Mode::Catch, Mode::Mania]) let pp = match (&[Mode::Std, Mode::Taiko, Mode::Catch, Mode::Mania])
.par_iter() .into_iter()
.map(|m| { .map(|m| {
handle_user_mode( handle_user_mode(
c.clone(), c.clone(),
@ -53,24 +57,38 @@ impl youmubot_prelude::Announcer for Announcer {
&cache, &cache,
&oppai, &oppai,
&osu_user, &osu_user,
*user_id, user_id,
&channels[..], &channels[..],
*m, *m,
&*d, &*d,
) )
}) })
.collect::<Result<_, _>>() .collect::<stream::FuturesOrdered<_>>()
.try_collect::<Vec<_>>()
.await
{ {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
eprintln!("osu: Cannot update {}: {}", osu_user.id, e.0); eprintln!("osu: Cannot update {}: {}", osu_user.id, e);
continue; return (user_id, osu_user);
} }
}; };
osu_user.last_update = chrono::Utc::now(); let last_update = chrono::Utc::now();
(
user_id,
OsuUser {
pp,
last_update,
..osu_user
},
)
} }
})
.collect::<stream::FuturesUnordered<_>>()
.collect::<HashMap<_, _>>()
.await;
// Update users // Update users
*OsuSavedUsers::open(&*d.read()).borrow_mut()? = data; *OsuSavedUsers::open(&*d.read().await).borrow_mut()? = data;
Ok(()) Ok(())
} }
} }
@ -87,35 +105,49 @@ async fn handle_user_mode(
mode: Mode, mode: Mode,
d: &TypeMap, d: &TypeMap,
) -> Result<Option<f64>, Error> { ) -> Result<Option<f64>, Error> {
let scores = scan_user(osu, osu_user, mode)?; let scores = scan_user(osu, osu_user, mode).await?;
let user = osu let user = osu
.user(UserID::ID(osu_user.id), |f| f.mode(mode)) .user(UserID::ID(osu_user.id), |f| f.mode(mode))
.await? .await?
.ok_or(Error::from("user not found"))?; .ok_or(Error::msg("user not found"))?;
scores scores
.into_iter() .into_iter()
.map(|(rank, score)| -> Result<_> { .map(|(rank, score)| async move {
let beatmap = cache.get_beatmap_default(score.beatmap_id)?; let beatmap = cache.get_beatmap_default(score.beatmap_id).await?;
let content = oppai.get_beatmap(beatmap.beatmap_id)?; let content = oppai.get_beatmap(beatmap.beatmap_id).await?;
Ok((rank, score, BeatmapWithMode(beatmap, mode), content)) let r: Result<_> = Ok((rank, score, BeatmapWithMode(beatmap, mode), content));
r
}) })
.filter_map(|v| v.ok()) .collect::<stream::FuturesOrdered<_>>()
.filter_map(|v| future::ready(v.ok()))
.for_each(|(rank, score, beatmap, content)| { .for_each(|(rank, score, beatmap, content)| {
let c = c.clone();
let user = &user;
async move {
for channel in (&channels).iter() { for channel in (&channels).iter() {
if let Err(e) = channel.send_message(c.http(), |c| { if let Err(e) = channel
.send_message(c.http(), |c| {
c.content(format!("New top record from {}!", user_id.mention())) c.content(format!("New top record from {}!", user_id.mention()))
.embed(|e| score_embed(&score, &beatmap, &content, &user, Some(rank), e)) .embed(|e| {
}) { score_embed(&score, &beatmap, &content, &user, Some(rank), e)
})
})
.await
{
dbg!(e); dbg!(e);
} }
save_beatmap(&*d.read(), *channel, &beatmap).ok(); save_beatmap(d, *channel, &beatmap).ok();
} }
}); }
})
.await;
Ok(user.pp) Ok(user.pp)
} }
fn scan_user(osu: &Osu, u: &OsuUser, mode: Mode) -> Result<Vec<(u8, Score)>, Error> { async fn scan_user(osu: &Osu, u: &OsuUser, mode: Mode) -> Result<Vec<(u8, Score)>, Error> {
let scores = osu.user_best(UserID::ID(u.id), |f| f.mode(mode).limit(25))?; let scores = osu
.user_best(UserID::ID(u.id), |f| f.mode(mode).limit(25))
.await?;
let scores = scores let scores = scores
.into_iter() .into_iter()
.enumerate() .enumerate()

View file

@ -89,7 +89,7 @@ impl BeatmapCache {
pub async fn get_beatmap<'a>( pub async fn get_beatmap<'a>(
&'a self, &'a self,
id: u64, id: u64,
) -> Result<impl std::ops::Deref<Target = BeatmapContent> + 'a, CommandError> { ) -> Result<impl std::ops::Deref<Target = BeatmapContent> + 'a> {
if !self.cache.contains_key(&id) { if !self.cache.contains_key(&id) {
self.cache.insert(id, self.download_beatmap(id).await?); self.cache.insert(id, self.download_beatmap(id).await?);
} }