osu updatelb command (#8)

* Make `paginate` take a Paginate trait impl, while `paginate_fn` takes a function

* Add `updatelb` command

* Implement a member cache

* Update member queries to use member cache

* Allow everyone to updatelb
This commit is contained in:
Natsu Kagami 2020-11-23 02:26:18 -05:00 committed by GitHub
parent b8471152d3
commit 6bf2779d61
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 241 additions and 47 deletions

1
Cargo.lock generated
View file

@ -1900,6 +1900,7 @@ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"chrono", "chrono",
"dashmap",
"flume", "flume",
"futures-util", "futures-util",
"reqwest", "reqwest",

View file

@ -172,7 +172,7 @@ pub async fn ranks(ctx: &Context, m: &Message) -> CommandResult {
let total_pages = (ranks.len() + ITEMS_PER_PAGE - 1) / ITEMS_PER_PAGE; let total_pages = (ranks.len() + ITEMS_PER_PAGE - 1) / ITEMS_PER_PAGE;
let last_updated = ranks.iter().map(|(_, cfu)| cfu.last_update).min().unwrap(); let last_updated = ranks.iter().map(|(_, cfu)| cfu.last_update).min().unwrap();
paginate( paginate_fn(
move |page, ctx, msg| { move |page, ctx, msg| {
let ranks = ranks.clone(); let ranks = ranks.clone();
Box::pin(async move { Box::pin(async move {
@ -255,16 +255,17 @@ pub async fn contestranks(ctx: &Context, m: &Message, mut args: Args) -> Command
let data = ctx.data.read().await; let data = ctx.data.read().await;
let contest_id: u64 = args.single()?; let contest_id: u64 = args.single()?;
let guild = m.guild_id.unwrap(); // Guild-only command let guild = m.guild_id.unwrap(); // Guild-only command
let member_cache = data.get::<MemberCache>().unwrap();
let members = CfSavedUsers::open(&*data).borrow()?.clone(); let members = CfSavedUsers::open(&*data).borrow()?.clone();
let members = members let members = members
.into_iter() .into_iter()
.map(|(user_id, cf_user)| { .map(|(user_id, cf_user)| {
guild member_cache
.member(&ctx, user_id) .query(&ctx, user_id, guild)
.map(|v| v.map(|v| (cf_user.handle, v))) .map(|v| v.map(|v| (cf_user.handle, v)))
}) })
.collect::<stream::FuturesUnordered<_>>() .collect::<stream::FuturesUnordered<_>>()
.filter_map(|v| future::ready(v.ok())) .filter_map(|v| future::ready(v))
.collect::<HashMap<_, _>>() .collect::<HashMap<_, _>>()
.await; .await;
let http = data.get::<CFClient>().unwrap(); let http = data.get::<CFClient>().unwrap();
@ -301,7 +302,7 @@ pub async fn contestranks(ctx: &Context, m: &Message, mut args: Args) -> Command
const ITEMS_PER_PAGE: usize = 10; const ITEMS_PER_PAGE: usize = 10;
let total_pages = (ranks.len() + ITEMS_PER_PAGE - 1) / ITEMS_PER_PAGE; let total_pages = (ranks.len() + ITEMS_PER_PAGE - 1) / ITEMS_PER_PAGE;
paginate( paginate_fn(
move |page, ctx, msg| { move |page, ctx, msg| {
let contest = contest.clone(); let contest = contest.clone();
let problems = problems.clone(); let problems = problems.clone();

View file

@ -33,7 +33,7 @@ async fn list(ctx: &Context, m: &Message, _: Args) -> CommandResult {
const ROLES_PER_PAGE: usize = 8; const ROLES_PER_PAGE: usize = 8;
let pages = (roles.len() + ROLES_PER_PAGE - 1) / ROLES_PER_PAGE; let pages = (roles.len() + ROLES_PER_PAGE - 1) / ROLES_PER_PAGE;
paginate( paginate_fn(
|page, ctx, msg| { |page, ctx, msg| {
let roles = roles.clone(); let roles = roles.clone();
Box::pin(async move { Box::pin(async move {

View file

@ -29,7 +29,7 @@ use db::OsuUser;
use db::{OsuLastBeatmap, OsuSavedUsers, OsuUserBests}; use db::{OsuLastBeatmap, OsuSavedUsers, OsuUserBests};
use embeds::{beatmap_embed, score_embed, user_embed}; use embeds::{beatmap_embed, score_embed, user_embed};
pub use hook::hook; pub use hook::hook;
use server_rank::{LEADERBOARD_COMMAND, SERVER_RANK_COMMAND}; use server_rank::{LEADERBOARD_COMMAND, SERVER_RANK_COMMAND, UPDATE_LEADERBOARD_COMMAND};
/// The osu! client. /// The osu! client.
pub(crate) struct OsuClient; pub(crate) struct OsuClient;
@ -59,6 +59,11 @@ pub fn setup(
OsuLastBeatmap::insert_into(&mut *data, &path.join("last_beatmaps.yaml"))?; OsuLastBeatmap::insert_into(&mut *data, &path.join("last_beatmaps.yaml"))?;
OsuUserBests::insert_into(&mut *data, &path.join("osu_user_bests.yaml"))?; OsuUserBests::insert_into(&mut *data, &path.join("osu_user_bests.yaml"))?;
// Locks
data.insert::<server_rank::update_lock::UpdateLock>(
server_rank::update_lock::UpdateLock::default(),
);
// API client // API client
let http_client = data.get::<HTTPClient>().unwrap().clone(); let http_client = data.get::<HTTPClient>().unwrap().clone();
let osu_client = Arc::new(OsuHttpClient::new( let osu_client = Arc::new(OsuHttpClient::new(
@ -89,7 +94,8 @@ pub fn setup(
check, check,
top, top,
server_rank, server_rank,
leaderboard leaderboard,
update_leaderboard
)] )]
#[default_command(std)] #[default_command(std)]
struct Osu; struct Osu;
@ -247,7 +253,7 @@ async fn list_plays<'a>(
const ITEMS_PER_PAGE: usize = 5; const ITEMS_PER_PAGE: usize = 5;
let total_pages = (plays.len() + ITEMS_PER_PAGE - 1) / ITEMS_PER_PAGE; let total_pages = (plays.len() + ITEMS_PER_PAGE - 1) / ITEMS_PER_PAGE;
paginate( paginate_fn(
move |page, ctx, msg| { move |page, ctx, msg| {
let plays = plays.clone(); let plays = plays.clone();
Box::pin(async move { Box::pin(async move {

View file

@ -4,12 +4,13 @@ use super::{
ModeArg, OsuClient, ModeArg, OsuClient,
}; };
use crate::{ use crate::{
discord::BeatmapWithMode,
models::{Mode, Score}, models::{Mode, Score},
request::UserID, request::UserID,
}; };
use serenity::{ use serenity::{
framework::standard::{macros::command, Args, CommandResult}, framework::standard::{macros::command, Args, CommandResult},
model::channel::Message, model::{channel::Message, id::UserId},
utils::MessageBuilder, utils::MessageBuilder,
}; };
use youmubot_prelude::*; use youmubot_prelude::*;
@ -23,11 +24,15 @@ pub async fn server_rank(ctx: &Context, m: &Message, mut args: Args) -> CommandR
let data = ctx.data.read().await; let data = ctx.data.read().await;
let mode = args.single::<ModeArg>().map(|v| v.0).unwrap_or(Mode::Std); let mode = args.single::<ModeArg>().map(|v| v.0).unwrap_or(Mode::Std);
let guild = m.guild_id.expect("Guild-only command"); let guild = m.guild_id.expect("Guild-only command");
let member_cache = data.get::<MemberCache>().unwrap();
let users = OsuSavedUsers::open(&*data).borrow()?.clone(); let users = OsuSavedUsers::open(&*data).borrow()?.clone();
let users = users let users = users
.into_iter() .into_iter()
.map(|(user_id, osu_user)| async move { .map(|(user_id, osu_user)| async move {
guild.member(&ctx, user_id).await.ok().and_then(|member| { member_cache
.query(&ctx, user_id, guild)
.await
.and_then(|member| {
osu_user osu_user
.pp .pp
.get(mode as usize) .get(mode as usize)
@ -55,7 +60,7 @@ pub async fn server_rank(ctx: &Context, m: &Message, mut args: Args) -> CommandR
let users = std::sync::Arc::new(users); let users = std::sync::Arc::new(users);
let last_update = last_update.unwrap(); let last_update = last_update.unwrap();
paginate( paginate_fn(
move |page: u8, ctx: &Context, m: &mut Message| { move |page: u8, ctx: &Context, m: &mut Message| {
const ITEMS_PER_PAGE: usize = 10; const ITEMS_PER_PAGE: usize = 10;
let users = users.clone(); let users = users.clone();
@ -101,14 +106,54 @@ pub async fn server_rank(ctx: &Context, m: &Message, mut args: Args) -> CommandR
Ok(()) Ok(())
} }
#[command("leaderboard")] pub(crate) mod update_lock {
#[aliases("lb", "bmranks", "br", "cc")] use serenity::{model::id::GuildId, prelude::TypeMapKey};
#[description = "See the server's ranks on the last seen beatmap"] use std::collections::HashSet;
use std::sync::Mutex;
#[derive(Debug, Default)]
pub struct UpdateLock(Mutex<HashSet<GuildId>>);
pub struct UpdateLockGuard<'a>(&'a UpdateLock, GuildId);
impl TypeMapKey for UpdateLock {
type Value = UpdateLock;
}
impl UpdateLock {
pub fn get(&self, guild: GuildId) -> Option<UpdateLockGuard> {
let mut set = self.0.lock().unwrap();
if set.contains(&guild) {
None
} else {
set.insert(guild);
Some(UpdateLockGuard(self, guild))
}
}
}
impl<'a> Drop for UpdateLockGuard<'a> {
fn drop(&mut self) {
let mut set = self.0 .0.lock().unwrap();
set.remove(&self.1);
}
}
}
#[command("updatelb")]
#[description = "Update the leaderboard on the last seen beatmap"]
#[max_args(0)] #[max_args(0)]
#[only_in(guilds)] #[only_in(guilds)]
pub async fn leaderboard(ctx: &Context, m: &Message, mut _args: Args) -> CommandResult { pub async fn update_leaderboard(ctx: &Context, m: &Message, mut _args: Args) -> CommandResult {
let guild = m.guild_id.unwrap();
let data = ctx.data.read().await; let data = ctx.data.read().await;
let mut osu_user_bests = OsuUserBests::open(&*data); let update_lock = data.get::<update_lock::UpdateLock>().unwrap();
let update_lock = match update_lock.get(guild) {
None => {
m.reply(&ctx, "Another update is running.").await?;
return Ok(());
}
Some(v) => v,
};
let bm = match get_beatmap(&*data, m.channel_id)? { let bm = match get_beatmap(&*data, m.channel_id)? {
Some(bm) => bm, Some(bm) => bm,
None => { None => {
@ -116,6 +161,86 @@ pub async fn leaderboard(ctx: &Context, m: &Message, mut _args: Args) -> Command
return Ok(()); return Ok(());
} }
}; };
let member_cache = data.get::<MemberCache>().unwrap();
// Signal that we are running.
let running_reaction = m.react(&ctx, '⌛').await?;
// Run a check on everyone in the server basically.
let all_server_users: Vec<(UserId, Vec<Score>)> = {
let osu = data.get::<OsuClient>().unwrap();
let osu_users = OsuSavedUsers::open(&*data);
let osu_users = osu_users
.borrow()?
.iter()
.map(|(&user_id, osu_user)| (user_id, osu_user.id))
.collect::<Vec<_>>();
let beatmap_id = bm.0.beatmap_id;
osu_users
.into_iter()
.map(|(user_id, osu_id)| {
member_cache
.query(&ctx, user_id, guild)
.map(move |t| t.map(|_| (user_id, osu_id)))
})
.collect::<stream::FuturesUnordered<_>>()
.filter_map(future::ready)
.filter_map(|(member, osu_id)| async move {
let scores = osu
.scores(beatmap_id, |f| f.user(UserID::ID(osu_id)))
.await
.ok();
scores
.filter(|s| !s.is_empty())
.map(|scores| (member, scores))
})
.collect::<Vec<_>>()
.await
};
let updated_users = all_server_users.len();
// Update everything.
{
let mut osu_user_bests = OsuUserBests::open(&*data);
let mut osu_user_bests = osu_user_bests.borrow_mut()?;
let user_bests = osu_user_bests.entry((bm.0.beatmap_id, bm.1)).or_default();
all_server_users.into_iter().for_each(|(member, scores)| {
user_bests.insert(member, scores);
})
}
// Signal update complete.
running_reaction.delete(&ctx).await.ok();
m.reply(
&ctx,
format!(
"update for beatmap ({}, {}) complete, {} users updated.",
bm.0.beatmap_id, bm.1, updated_users
),
)
.await
.ok();
drop(update_lock);
show_leaderboard(ctx, m, bm).await
}
#[command("leaderboard")]
#[aliases("lb", "bmranks", "br", "cc")]
#[description = "See the server's ranks on the last seen beatmap"]
#[max_args(0)]
#[only_in(guilds)]
pub async fn leaderboard(ctx: &Context, m: &Message, mut _args: Args) -> CommandResult {
let data = ctx.data.read().await;
let bm = match get_beatmap(&*data, m.channel_id)? {
Some(bm) => bm,
None => {
m.reply(&ctx, "No beatmap queried on this channel.").await?;
return Ok(());
}
};
show_leaderboard(ctx, m, bm).await
}
async fn show_leaderboard(ctx: &Context, m: &Message, bm: BeatmapWithMode) -> CommandResult {
let data = ctx.data.read().await;
let mut osu_user_bests = OsuUserBests::open(&*data);
// Run a check on the user once too! // Run a check on the user once too!
{ {
@ -139,6 +264,7 @@ pub async fn leaderboard(ctx: &Context, m: &Message, mut _args: Args) -> Command
} }
let guild = m.guild_id.expect("Guild-only command"); let guild = m.guild_id.expect("Guild-only command");
let member_cache = data.get::<MemberCache>().unwrap();
let scores = { let scores = {
const NO_SCORES: &'static str = const NO_SCORES: &'static str =
"No scores have been recorded for this beatmap. Run `osu check` to scan for yours!"; "No scores have been recorded for this beatmap. Run `osu check` to scan for yours!";
@ -161,12 +287,10 @@ pub async fn leaderboard(ctx: &Context, m: &Message, mut _args: Args) -> Command
let mut scores: Vec<(f64, String, Score)> = users let mut scores: Vec<(f64, String, Score)> = users
.into_iter() .into_iter()
.map(|(user_id, scores)| async move { .map(|(user_id, scores)| {
guild member_cache
.member(&ctx, user_id) .query(&ctx, user_id, guild)
.await .map(|m| m.map(move |m| (m.distinct(), scores)))
.ok()
.and_then(|m| Some((m.distinct(), scores)))
}) })
.collect::<stream::FuturesUnordered<_>>() .collect::<stream::FuturesUnordered<_>>()
.filter_map(|v| future::ready(v)) .filter_map(|v| future::ready(v))
@ -192,7 +316,7 @@ pub async fn leaderboard(ctx: &Context, m: &Message, mut _args: Args) -> Command
.await?; .await?;
return Ok(()); return Ok(());
} }
paginate( paginate_fn(
move |page: u8, ctx: &Context, m: &mut Message| { move |page: u8, ctx: &Context, m: &mut Message| {
const ITEMS_PER_PAGE: usize = 5; const ITEMS_PER_PAGE: usize = 5;
let start = (page as usize) * ITEMS_PER_PAGE; let start = (page as usize) * ITEMS_PER_PAGE;

View file

@ -15,6 +15,7 @@ youmubot-db = { path = "../youmubot-db" }
reqwest = "0.10" reqwest = "0.10"
chrono = "0.4" chrono = "0.4"
flume = "0.9" flume = "0.9"
dashmap = "3"
[dependencies.serenity] [dependencies.serenity]
version = "0.9" version = "0.9"

View file

@ -1,4 +1,4 @@
use crate::{AppData, Result}; use crate::{AppData, MemberCache, Result};
use async_trait::async_trait; use async_trait::async_trait;
use futures_util::{ use futures_util::{
future::{join_all, ready, FutureExt}, future::{join_all, ready, FutureExt},
@ -46,7 +46,7 @@ pub trait Announcer: Send {
} }
/// A simple struct that allows looking up the relevant channels to an user. /// A simple struct that allows looking up the relevant channels to an user.
pub struct MemberToChannels(Vec<(GuildId, ChannelId)>); pub struct MemberToChannels(Vec<(GuildId, ChannelId)>, AppData);
impl MemberToChannels { impl MemberToChannels {
/// Gets the channel list of an user related to that channel. /// Gets the channel list of an user related to that channel.
@ -56,13 +56,14 @@ impl MemberToChannels {
u: impl Into<UserId>, u: impl Into<UserId>,
) -> Vec<ChannelId> { ) -> Vec<ChannelId> {
let u: UserId = u.into(); let u: UserId = u.into();
let member_cache = self.1.read().await.get::<MemberCache>().unwrap().clone();
self.0 self.0
.clone() .clone()
.into_iter() .into_iter()
.map(|(guild, channel): (GuildId, ChannelId)| { .map(|(guild, channel)| {
guild member_cache
.member(http.clone(), u) .query(http.clone(), u.into(), guild)
.map(move |v| v.ok().map(|_| channel.clone())) .map(move |t| t.map(|_| channel))
}) })
.collect::<FuturesUnordered<_>>() .collect::<FuturesUnordered<_>>()
.filter_map(|v| ready(v)) .filter_map(|v| ready(v))
@ -137,7 +138,7 @@ impl AnnouncerHandler {
key: &'static str, key: &'static str,
announcer: &'_ RwLock<Box<dyn Announcer + Send + Sync>>, announcer: &'_ RwLock<Box<dyn Announcer + Send + Sync>>,
) -> Result<()> { ) -> Result<()> {
let channels = MemberToChannels(Self::get_guilds(&data, key).await?); let channels = MemberToChannels(Self::get_guilds(&data, key).await?, data.clone());
announcer announcer
.write() .write()
.await .await

View file

@ -6,6 +6,7 @@ use std::sync::Arc;
pub mod announcer; pub mod announcer;
pub mod args; pub mod args;
pub mod hook; pub mod hook;
pub mod member_cache;
pub mod pagination; pub mod pagination;
pub mod ratelimit; pub mod ratelimit;
pub mod setup; pub mod setup;
@ -13,7 +14,8 @@ pub mod setup;
pub use announcer::{Announcer, AnnouncerHandler}; pub use announcer::{Announcer, AnnouncerHandler};
pub use args::{Duration, UsernameArg}; pub use args::{Duration, UsernameArg};
pub use hook::Hook; pub use hook::Hook;
pub use pagination::paginate; pub use member_cache::MemberCache;
pub use pagination::{paginate, paginate_fn};
/// Re-exporting async_trait helps with implementing Announcer. /// Re-exporting async_trait helps with implementing Announcer.
pub use async_trait::async_trait; pub use async_trait::async_trait;

View file

@ -0,0 +1,45 @@
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use serenity::model::{
guild::Member,
id::{GuildId, UserId},
};
use serenity::{http::CacheHttp, prelude::*};
use std::sync::Arc;
const VALID_CACHE_SECONDS: i64 = 15 * 60; // 15 minutes
/// MemberCache resolves `does User belong to Guild` requests, and store them in a cache.
#[derive(Debug, Default)]
pub struct MemberCache(DashMap<(UserId, GuildId), (Option<Member>, DateTime<Utc>)>);
impl TypeMapKey for MemberCache {
type Value = Arc<MemberCache>;
}
impl MemberCache {
pub async fn query(
&self,
cache_http: impl CacheHttp,
user_id: UserId,
guild_id: GuildId,
) -> Option<Member> {
let now = Utc::now();
// Check cache
if let Some(r) = self.0.get(&(user_id, guild_id)) {
if &r.1 > &now {
return r.0.clone();
}
}
// Query
let t = guild_id.member(&cache_http, user_id).await.ok();
self.0.insert(
(user_id, guild_id),
(
t.clone(),
now + chrono::Duration::seconds(VALID_CACHE_SECONDS),
),
);
t
}
}

View file

@ -14,7 +14,7 @@ const ARROW_RIGHT: &'static str = "➡️";
const ARROW_LEFT: &'static str = "⬅️"; const ARROW_LEFT: &'static str = "⬅️";
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Paginate { pub trait Paginate: Send {
async fn render(&mut self, page: u8, ctx: &Context, m: &mut Message) -> Result<bool>; async fn render(&mut self, page: u8, ctx: &Context, m: &mut Message) -> Result<bool>;
} }
@ -36,12 +36,7 @@ where
// Paginate! with a pager function. // Paginate! with a pager function.
/// If awaited, will block until everything is done. /// If awaited, will block until everything is done.
pub async fn paginate( pub async fn paginate(
mut pager: impl for<'m> FnMut( mut pager: impl Paginate,
u8,
&'m Context,
&'m mut Message,
) -> std::pin::Pin<Box<dyn Future<Output = Result<bool>> + Send + 'm>>
+ Send,
ctx: &Context, ctx: &Context,
channel: ChannelId, channel: ChannelId,
timeout: std::time::Duration, timeout: std::time::Duration,
@ -56,7 +51,7 @@ pub async fn paginate(
message message
.react(&ctx, ReactionType::try_from(ARROW_RIGHT)?) .react(&ctx, ReactionType::try_from(ARROW_RIGHT)?)
.await?; .await?;
pager(0, ctx, &mut message).await?; pager.render(0, ctx, &mut message).await?;
// Build a reaction collector // Build a reaction collector
let mut reaction_collector = message.await_reactions(&ctx).removed(true).await; let mut reaction_collector = message.await_reactions(&ctx).removed(true).await;
let mut page = 0; let mut page = 0;
@ -80,6 +75,21 @@ pub async fn paginate(
res res
} }
/// Same as `paginate`, but for function inputs, especially anonymous functions.
pub async fn paginate_fn(
pager: impl for<'m> FnMut(
u8,
&'m Context,
&'m mut Message,
) -> std::pin::Pin<Box<dyn Future<Output = Result<bool>> + Send + 'm>>
+ Send,
ctx: &Context,
channel: ChannelId,
timeout: std::time::Duration,
) -> Result<()> {
paginate(pager, ctx, channel, timeout).await
}
// Handle the reaction and return a new page number. // Handle the reaction and return a new page number.
async fn handle_reaction( async fn handle_reaction(
page: u8, page: u8,

View file

@ -11,4 +11,7 @@ pub fn setup_prelude(db_path: &Path, data: &mut TypeMap) {
// Set up the HTTP client. // Set up the HTTP client.
data.insert::<crate::HTTPClient>(reqwest::Client::new()); data.insert::<crate::HTTPClient>(reqwest::Client::new());
// Set up the member cache.
data.insert::<crate::MemberCache>(std::sync::Arc::new(crate::MemberCache::default()));
} }