WIP: osu: asyncify caches

This commit is contained in:
Natsu Kagami 2020-09-06 22:09:56 -04:00
parent bd5f4f0fd2
commit 07a8bc5579
No known key found for this signature in database
GPG key ID: F17543D4B9424B94
2 changed files with 83 additions and 71 deletions

View file

@ -22,50 +22,61 @@ use youmubot_prelude::*;
/// osu! announcer's unique announcer key. /// osu! announcer's unique announcer key.
pub const ANNOUNCER_KEY: &'static str = "osu"; pub const ANNOUNCER_KEY: &'static str = "osu";
/// Announce osu! top scores. /// The announcer struct implementing youmubot_prelude::Announcer
pub fn updates(c: Arc<CacheAndHttp>, d: AppData, channels: MemberToChannels) -> CommandResult { pub struct Announcer;
let osu = d.get_cloned::<OsuClient>();
let cache = d.get_cloned::<BeatmapMetaCache>(); #[async_trait]
let oppai = d.get_cloned::<BeatmapCache>(); impl youmubot_prelude::Announcer for Announcer {
// For each user... async fn updates(
let mut data = OsuSavedUsers::open(&*d.read()).borrow()?.clone(); &mut self,
for (user_id, osu_user) in data.iter_mut() { c: Arc<CacheAndHttp>,
let channels = channels.channels_of(c.clone(), *user_id); d: AppData,
if channels.is_empty() { channels: MemberToChannels,
continue; // We don't wanna update an user without any active server ) -> Result<()> {
} let d = d.read().await;
osu_user.pp = match (&[Mode::Std, Mode::Taiko, Mode::Catch, Mode::Mania]) let osu = d.get::<OsuClient>().unwrap();
.par_iter() let cache = d.get::<BeatmapMetaCache>().unwrap();
.map(|m| { let oppai = d.get::<BeatmapCache>().unwrap();
handle_user_mode( // For each user...
c.clone(), let mut data = OsuSavedUsers::open(&*d).borrow()?.clone();
&osu, for (user_id, osu_user) in data.iter_mut() {
&cache, let channels = channels.channels_of(c.clone(), *user_id).await;
&oppai, if channels.is_empty() {
&osu_user, continue; // We don't wanna update an user without any active server
*user_id,
&channels[..],
*m,
d.clone(),
)
})
.collect::<Result<_, _>>()
{
Ok(v) => v,
Err(e) => {
eprintln!("osu: Cannot update {}: {}", osu_user.id, e.0);
continue;
} }
}; osu_user.pp = match (&[Mode::Std, Mode::Taiko, Mode::Catch, Mode::Mania])
osu_user.last_update = chrono::Utc::now(); .par_iter()
.map(|m| {
handle_user_mode(
c.clone(),
&osu,
&cache,
&oppai,
&osu_user,
*user_id,
&channels[..],
*m,
&*d,
)
})
.collect::<Result<_, _>>()
{
Ok(v) => v,
Err(e) => {
eprintln!("osu: Cannot update {}: {}", osu_user.id, e.0);
continue;
}
};
osu_user.last_update = chrono::Utc::now();
}
// Update users
*OsuSavedUsers::open(&*d.read()).borrow_mut()? = data;
Ok(())
} }
// Update users
*OsuSavedUsers::open(&*d.read()).borrow_mut()? = data;
Ok(())
} }
/// Handles an user/mode scan, announces all possible new scores, return the new pp value. /// Handles an user/mode scan, announces all possible new scores, return the new pp value.
fn handle_user_mode( async fn handle_user_mode(
c: Arc<CacheAndHttp>, c: Arc<CacheAndHttp>,
osu: &Osu, osu: &Osu,
cache: &BeatmapMetaCache, cache: &BeatmapMetaCache,
@ -74,15 +85,16 @@ fn handle_user_mode(
user_id: UserId, user_id: UserId,
channels: &[ChannelId], channels: &[ChannelId],
mode: Mode, mode: Mode,
d: AppData, d: &TypeMap,
) -> Result<Option<f64>, Error> { ) -> Result<Option<f64>, Error> {
let scores = scan_user(osu, osu_user, mode)?; let scores = scan_user(osu, osu_user, mode)?;
let user = osu let user = osu
.user(UserID::ID(osu_user.id), |f| f.mode(mode))? .user(UserID::ID(osu_user.id), |f| f.mode(mode))
.await?
.ok_or(Error::from("user not found"))?; .ok_or(Error::from("user not found"))?;
scores scores
.into_par_iter() .into_iter()
.map(|(rank, score)| -> Result<_, Error> { .map(|(rank, score)| -> Result<_> {
let beatmap = cache.get_beatmap_default(score.beatmap_id)?; let beatmap = cache.get_beatmap_default(score.beatmap_id)?;
let content = oppai.get_beatmap(beatmap.beatmap_id)?; let content = oppai.get_beatmap(beatmap.beatmap_id)?;
Ok((rank, score, BeatmapWithMode(beatmap, mode), content)) Ok((rank, score, BeatmapWithMode(beatmap, mode), content))

View file

@ -3,16 +3,14 @@ use crate::{
Client, Client,
}; };
use dashmap::DashMap; use dashmap::DashMap;
use serenity::framework::standard::CommandError; use youmubot_prelude::*;
use std::sync::Arc;
use youmubot_prelude::TypeMapKey;
/// BeatmapMetaCache intercepts beatmap-by-id requests and caches them for later recalling. /// BeatmapMetaCache intercepts beatmap-by-id requests and caches them for later recalling.
/// Does not cache non-Ranked beatmaps. /// Does not cache non-Ranked beatmaps.
#[derive(Clone, Debug)] #[derive(Debug)]
pub struct BeatmapMetaCache { pub struct BeatmapMetaCache {
client: Client, client: Client,
cache: Arc<DashMap<(u64, Mode), Beatmap>>, cache: DashMap<(u64, Mode), Beatmap>,
} }
impl TypeMapKey for BeatmapMetaCache { impl TypeMapKey for BeatmapMetaCache {
@ -24,10 +22,10 @@ impl BeatmapMetaCache {
pub fn new(client: Client) -> Self { pub fn new(client: Client) -> Self {
BeatmapMetaCache { BeatmapMetaCache {
client, client,
cache: Arc::new(DashMap::new()), cache: DashMap::new(),
} }
} }
fn insert_if_possible(&self, id: u64, mode: Option<Mode>) -> Result<Beatmap, CommandError> { async fn insert_if_possible(&self, id: u64, mode: Option<Mode>) -> Result<Beatmap> {
let beatmap = self let beatmap = self
.client .client
.beatmaps(crate::BeatmapRequestKind::Beatmap(id), |f| { .beatmaps(crate::BeatmapRequestKind::Beatmap(id), |f| {
@ -36,35 +34,37 @@ impl BeatmapMetaCache {
} }
f f
}) })
.and_then(|v| { .await
v.into_iter() .and_then(|v| v.into_iter().next().ok_or(Error::msg("beatmap not found")))?;
.next()
.ok_or(CommandError::from("beatmap not found"))
})?;
if let ApprovalStatus::Ranked(_) = beatmap.approval { if let ApprovalStatus::Ranked(_) = beatmap.approval {
self.cache.insert((id, beatmap.mode), beatmap.clone()); self.cache.insert((id, beatmap.mode), beatmap.clone());
}; };
Ok(beatmap) Ok(beatmap)
} }
/// Get the given beatmap /// Get the given beatmap
pub fn get_beatmap(&self, id: u64, mode: Mode) -> Result<Beatmap, CommandError> { pub async fn get_beatmap(&self, id: u64, mode: Mode) -> Result<Beatmap> {
self.cache match self.cache.get(&(id, mode)).map(|v| v.clone()) {
.get(&(id, mode)) Some(v) => Ok(v),
.map(|b| Ok(b.clone())) None => self.insert_if_possible(id, Some(mode)).await,
.unwrap_or_else(|| self.insert_if_possible(id, Some(mode))) }
} }
/// Get a beatmap without a mode... /// Get a beatmap without a mode...
pub fn get_beatmap_default(&self, id: u64) -> Result<Beatmap, CommandError> { pub async fn get_beatmap_default(&self, id: u64) -> Result<Beatmap> {
(&[Mode::Std, Mode::Taiko, Mode::Catch, Mode::Mania]) Ok(
.iter() match (&[Mode::Std, Mode::Taiko, Mode::Catch, Mode::Mania])
.filter_map(|&mode| { .iter()
self.cache .filter_map(|&mode| {
.get(&(id, mode)) self.cache
.filter(|b| b.mode == mode) .get(&(id, mode))
.map(|b| Ok(b.clone())) .filter(|b| b.mode == mode)
}) .map(|b| b.clone())
.next() })
.unwrap_or_else(|| self.insert_if_possible(id, None)) .next()
{
Some(v) => v,
None => self.insert_if_possible(id, None).await?,
},
)
} }
} }