Ratelimit properly, live works again!!

This commit is contained in:
Natsu Kagami 2021-02-08 03:04:57 +09:00
parent edaa461d78
commit 7efc4e8021
Signed by: nki
GPG key ID: 7306B3D3C3AD6E51
5 changed files with 125 additions and 52 deletions

View file

@ -9,6 +9,8 @@ use serenity::{http::CacheHttp, model::id::UserId, CacheAndHttp};
use std::sync::Arc; use std::sync::Arc;
use youmubot_prelude::*; use youmubot_prelude::*;
type Client = <CFClient as TypeMapKey>::Value;
/// Updates the rating and rating changes of the users. /// Updates the rating and rating changes of the users.
pub struct Announcer; pub struct Announcer;
@ -23,14 +25,44 @@ impl youmubot_prelude::Announcer for Announcer {
let data = data.read().await; let data = data.read().await;
let client = data.get::<CFClient>().unwrap(); let client = data.get::<CFClient>().unwrap();
let mut users = CfSavedUsers::open(&*data).borrow()?.clone(); let mut users = CfSavedUsers::open(&*data).borrow()?.clone();
users users
.iter_mut() .iter_mut()
.map(|(user_id, cfu)| update_user(http.clone(), &channels, &client, *user_id, cfu)) .map(|(user_id, cfu)| {
let http = http.clone();
let channels = &channels;
async move {
if let Err(e) = update_user(http, &channels, &client, *user_id, cfu).await {
cfu.failures += 1;
eprintln!(
"Codeforces: cannot update user {}: {} [{} failures]",
cfu.handle, e, cfu.failures
);
} else {
cfu.failures = 0;
}
}
})
.collect::<stream::FuturesUnordered<_>>() .collect::<stream::FuturesUnordered<_>>()
.try_collect::<()>() .collect::<()>()
.await?; .await;
*CfSavedUsers::open(&*data).borrow_mut()? = users; let mut db = CfSavedUsers::open(&*data);
let mut db = db.borrow_mut()?;
for (key, user) in users {
match db.get(&key).map(|v| v.last_update) {
Some(u) if u > user.last_update => (),
_ => {
if user.failures >= 5 {
eprintln!(
"Codeforces: Removing user {} - {}: failures count too high",
key, user.handle,
);
db.remove(&key);
} else {
db.insert(key, user);
}
}
}
}
Ok(()) Ok(())
} }
} }
@ -38,17 +70,17 @@ impl youmubot_prelude::Announcer for Announcer {
async fn update_user( async fn update_user(
http: Arc<CacheAndHttp>, http: Arc<CacheAndHttp>,
channels: &MemberToChannels, channels: &MemberToChannels,
client: &codeforces::Client, client: &Client,
user_id: UserId, user_id: UserId,
cfu: &mut CfUser, cfu: &mut CfUser,
) -> Result<()> { ) -> Result<()> {
let info = User::info(client, &[cfu.handle.as_str()]) let info = User::info(&*client.borrow().await?, &[cfu.handle.as_str()])
.await? .await?
.into_iter() .into_iter()
.next() .next()
.ok_or(Error::msg("Not found"))?; .ok_or(Error::msg("Not found"))?;
let rating_changes = info.rating_changes(client).await?; let rating_changes = info.rating_changes(&*client.borrow().await?).await?;
let channels_list = channels.channels_of(&http, user_id).await; let channels_list = channels.channels_of(&http, user_id).await;
cfu.last_update = Utc::now(); cfu.last_update = Utc::now();
@ -87,8 +119,10 @@ async fn update_user(
return Ok(()); return Ok(());
} }
let (contest, _, _) = let (contest, _, _) =
codeforces::Contest::standings(client, rc.contest_id, |f| f.limit(1, 1)) codeforces::Contest::standings(&*client.borrow().await?, rc.contest_id, |f| {
.await?; f.limit(1, 1)
})
.await?;
channels channels
.iter() .iter()
.map(|channel| { .map(|channel| {

View file

@ -15,6 +15,8 @@ pub struct CfUser {
#[serde(default)] #[serde(default)]
pub last_contest_id: Option<u64>, pub last_contest_id: Option<u64>,
pub rating: Option<i64>, pub rating: Option<i64>,
#[serde(default)]
pub failures: u8,
} }
impl CfUser { impl CfUser {
@ -26,6 +28,7 @@ impl CfUser {
last_update: Utc::now(), last_update: Utc::now(),
last_contest_id: rc.into_iter().last().map(|v| v.contest_id), last_contest_id: rc.into_iter().last().map(|v| v.contest_id),
rating: u.rating, rating: u.rating,
failures: 0,
} }
} }
} }

View file

@ -1,5 +1,5 @@
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
use codeforces::{Client, Contest, Problem}; use codeforces::{Contest, Problem};
use dashmap::DashMap as HashMap; use dashmap::DashMap as HashMap;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use regex::{Captures, Regex}; use regex::{Captures, Regex};
@ -7,9 +7,11 @@ use serenity::{
builder::CreateEmbed, framework::standard::CommandError, model::channel::Message, builder::CreateEmbed, framework::standard::CommandError, model::channel::Message,
utils::MessageBuilder, utils::MessageBuilder,
}; };
use std::{sync::Arc, time::Instant}; use std::time::Instant;
use youmubot_prelude::*; use youmubot_prelude::*;
type Client = <crate::CFClient as TypeMapKey>::Value;
lazy_static! { lazy_static! {
static ref CONTEST_LINK: Regex = Regex::new( static ref CONTEST_LINK: Regex = Regex::new(
r"https?://codeforces\.com/(contest|gym)s?/(?P<contest>\d+)(?:/problem/(?P<problem>\w+))?" r"https?://codeforces\.com/(contest|gym)s?/(?P<contest>\d+)(?:/problem/(?P<problem>\w+))?"
@ -30,7 +32,7 @@ enum ContestOrProblem {
pub struct ContestCache { pub struct ContestCache {
contests: HashMap<u64, (Contest, Option<Vec<Problem>>)>, contests: HashMap<u64, (Contest, Option<Vec<Problem>>)>,
all_list: RwLock<(Vec<Contest>, Instant)>, all_list: RwLock<(Vec<Contest>, Instant)>,
http: Arc<Client>, http: Client,
} }
impl TypeMapKey for ContestCache { impl TypeMapKey for ContestCache {
@ -39,8 +41,8 @@ impl TypeMapKey for ContestCache {
impl ContestCache { impl ContestCache {
/// Creates a new, empty cache. /// Creates a new, empty cache.
pub async fn new(http: Arc<Client>) -> Result<Self> { pub(crate) async fn new(http: Client) -> Result<Self> {
let contests_list = Contest::list(&*http, true).await?; let contests_list = Contest::list(&*http.borrow().await?, true).await?;
Ok(Self { Ok(Self {
contests: HashMap::new(), contests: HashMap::new(),
all_list: RwLock::new((contests_list, Instant::now())), all_list: RwLock::new((contests_list, Instant::now())),
@ -62,14 +64,17 @@ impl ContestCache {
&self, &self,
contest_id: u64, contest_id: u64,
) -> Result<(Contest, Option<Vec<Problem>>)> { ) -> Result<(Contest, Option<Vec<Problem>>)> {
let (c, p) = match Contest::standings(&*self.http, contest_id, |f| f.limit(1, 1)).await { let (c, p) =
Ok((c, p, _)) => (c, Some(p)), match Contest::standings(&*self.http.borrow().await?, contest_id, |f| f.limit(1, 1))
Err(codeforces::Error::Codeforces(s)) if s.ends_with("has not started") => { .await
let c = self.get_from_list(contest_id).await?; {
(c, None) Ok((c, p, _)) => (c, Some(p)),
} Err(codeforces::Error::Codeforces(s)) if s.ends_with("has not started") => {
Err(v) => return Err(Error::from(v)), let c = self.get_from_list(contest_id).await?;
}; (c, None)
}
Err(v) => return Err(Error::from(v)),
};
self.contests.insert(contest_id, (c, p)); self.contests.insert(contest_id, (c, p));
Ok(self.contests.get(&contest_id).unwrap().clone()) Ok(self.contests.get(&contest_id).unwrap().clone())
} }
@ -78,8 +83,10 @@ impl ContestCache {
let last_updated = self.all_list.read().await.1.clone(); let last_updated = self.all_list.read().await.1.clone();
if Instant::now() - last_updated > std::time::Duration::from_secs(60 * 60) { if Instant::now() - last_updated > std::time::Duration::from_secs(60 * 60) {
// We update at most once an hour. // We update at most once an hour.
*self.all_list.write().await = *self.all_list.write().await = (
(Contest::list(&*self.http, true).await?, Instant::now()); Contest::list(&*self.http.borrow().await?, true).await?,
Instant::now(),
);
} }
self.all_list self.all_list
.read() .read()

View file

@ -22,7 +22,7 @@ mod live;
struct CFClient; struct CFClient;
impl TypeMapKey for CFClient { impl TypeMapKey for CFClient {
type Value = Arc<codeforces::Client>; type Value = Arc<ratelimit::Ratelimit<codeforces::Client>>;
} }
use db::{CfSavedUsers, CfUser}; use db::{CfSavedUsers, CfUser};
@ -33,7 +33,11 @@ pub use hook::InfoHook;
pub async fn setup(path: &std::path::Path, data: &mut TypeMap, announcers: &mut AnnouncerHandler) { pub async fn setup(path: &std::path::Path, data: &mut TypeMap, announcers: &mut AnnouncerHandler) {
CfSavedUsers::insert_into(data, path.join("cf_saved_users.yaml")) CfSavedUsers::insert_into(data, path.join("cf_saved_users.yaml"))
.expect("Must be able to set up DB"); .expect("Must be able to set up DB");
let client = Arc::new(codeforces::Client::new()); let client = Arc::new(ratelimit::Ratelimit::new(
codeforces::Client::new(),
4,
std::time::Duration::from_secs(1),
));
data.insert::<hook::ContestCache>(hook::ContestCache::new(client.clone()).await.unwrap()); data.insert::<hook::ContestCache>(hook::ContestCache::new(client.clone()).await.unwrap());
data.insert::<CFClient>(client); data.insert::<CFClient>(client);
announcers.add("codeforces", announcer::Announcer); announcers.add("codeforces", announcer::Announcer);
@ -74,7 +78,7 @@ pub async fn profile(ctx: &Context, m: &Message, mut args: Args) -> CommandResul
} }
}; };
let account = codeforces::User::info(&http, &[&handle[..]]) let account = codeforces::User::info(&*http.borrow().await?, &[&handle[..]])
.await? .await?
.into_iter() .into_iter()
.next(); .next();
@ -106,7 +110,7 @@ pub async fn save(ctx: &Context, m: &Message, mut args: Args) -> CommandResult {
let handle = args.single::<String>()?; let handle = args.single::<String>()?;
let http = data.get::<CFClient>().unwrap(); let http = data.get::<CFClient>().unwrap();
let account = codeforces::User::info(&http, &[&handle[..]]) let account = codeforces::User::info(&*http.borrow().await?, &[&handle[..]])
.await? .await?
.into_iter() .into_iter()
.next(); .next();
@ -118,7 +122,7 @@ pub async fn save(ctx: &Context, m: &Message, mut args: Args) -> CommandResult {
} }
Some(acc) => { Some(acc) => {
// Collect rating changes data. // Collect rating changes data.
let rating_changes = acc.rating_changes(&http).await?; let rating_changes = acc.rating_changes(&*http.borrow().await?).await?;
let mut db = CfSavedUsers::open(&*data); let mut db = CfSavedUsers::open(&*data);
m.reply( m.reply(
&ctx, &ctx,
@ -268,7 +272,7 @@ pub async fn contestranks(ctx: &Context, m: &Message, mut args: Args) -> Command
.collect::<HashMap<_, _>>() .collect::<HashMap<_, _>>()
.await; .await;
let http = data.get::<CFClient>().unwrap(); let http = data.get::<CFClient>().unwrap();
let (contest, problems, ranks) = Contest::standings(http, contest_id, |f| { let (contest, problems, ranks) = Contest::standings(&*http.borrow().await?, contest_id, |f| {
f.handles(members.iter().map(|(k, _)| k.clone()).collect()) f.handles(members.iter().map(|(k, _)| k.clone()).collect())
}) })
.await?; .await?;

View file

@ -27,15 +27,18 @@ pub async fn watch_contest(
) -> Result<()> { ) -> Result<()> {
let data = ctx.data.read().await; let data = ctx.data.read().await;
let db = CfSavedUsers::open(&*data).borrow()?.clone(); let db = CfSavedUsers::open(&*data).borrow()?.clone();
let http = ctx.http.clone(); let member_cache = data.get::<member_cache::MemberCache>().unwrap().clone();
let mut msg = channel
.send_message(&ctx, |e| e.content("Youmu is building the member list..."))
.await?;
// Collect an initial member list. // Collect an initial member list.
// This never changes during the scan. // This never changes during the scan.
let mut member_results: HashMap<UserId, MemberResult> = db let mut member_results: HashMap<UserId, MemberResult> = db
.into_iter() .into_iter()
.map(|(user_id, cfu)| { .map(|(user_id, cfu)| {
let http = http.clone(); let member_cache = &member_cache;
async move { async move {
guild.member(http, user_id).await.map(|m| { member_cache.query(ctx, user_id, guild).await.map(|m| {
( (
user_id, user_id,
MemberResult { MemberResult {
@ -48,29 +51,36 @@ pub async fn watch_contest(
} }
}) })
.collect::<stream::FuturesUnordered<_>>() .collect::<stream::FuturesUnordered<_>>()
.filter_map(|v| future::ready(v.ok())) .filter_map(|v| future::ready(v))
.collect() .collect()
.await; .await;
let http = data.get::<CFClient>().unwrap(); let http = data.get::<CFClient>().unwrap();
let (mut contest, _, _) = Contest::standings(&http, contest_id, |f| f.limit(1, 1)).await?; let (mut contest, _, _) =
Contest::standings(&*http.borrow().await?, contest_id, |f| f.limit(1, 1)).await?;
channel msg.edit(&ctx, |e| {
.send_message(&ctx, |e| { e.content(format!(
e.content(format!( "Youmu is watching contest **{}**, with the following members: {}",
"Youmu is watching contest **{}**, with the following members:\n{}", contest.name,
contest.name, member_results
member_results .iter()
.iter() .map(|(_, m)| serenity::utils::MessageBuilder::new()
.map(|(_, m)| format!("- {} as **{}**", m.member.distinct(), m.handle)) .push_safe(m.member.distinct())
.collect::<Vec<_>>() .push(" (")
.join("\n"), .push_mono_safe(&m.handle)
)) .push(")")
}) .build())
.await?; .collect::<Vec<_>>()
.join(", "),
))
})
.await?;
loop { loop {
if let Ok(messages) = scan_changes(&*http, &mut member_results, &mut contest).await { if let Ok(messages) =
scan_changes(&*http.borrow().await?, &mut member_results, &mut contest).await
{
for message in messages { for message in messages {
channel channel
.send_message(&ctx, |e| { .send_message(&ctx, |e| {
@ -128,6 +138,17 @@ pub async fn watch_contest(
Ok(()) Ok(())
} }
fn mention(phase: ContestPhase, m: &Member) -> String {
match phase {
ContestPhase::Before | ContestPhase::Coding =>
// Don't mention directly, avoid spamming in contest
{
MessageBuilder::new().push_safe(m.distinct()).build()
}
_ => m.mention().to_string(),
}
}
async fn scan_changes( async fn scan_changes(
http: &codeforces::Client, http: &codeforces::Client,
members: &mut HashMap<UserId, MemberResult>, members: &mut HashMap<UserId, MemberResult>,
@ -179,6 +200,7 @@ async fn scan_changes(
last_submission_time_seconds: None, last_submission_time_seconds: None,
}); });
messages.extend(translate_overall_result( messages.extend(translate_overall_result(
contest.phase,
member_result.handle.as_str(), member_result.handle.as_str(),
&old_row, &old_row,
&row, &row,
@ -192,6 +214,7 @@ async fn scan_changes(
) { ) {
if let Some(message) = analyze_change(&contest, old, new).map(|c| { if let Some(message) = analyze_change(&contest, old, new).map(|c| {
translate_change( translate_change(
contest.phase,
member_result.handle.as_str(), member_result.handle.as_str(),
&row, &row,
&member_result.member, &member_result.member,
@ -219,6 +242,7 @@ async fn scan_changes(
} }
fn translate_overall_result( fn translate_overall_result(
phase: ContestPhase,
handle: &str, handle: &str,
old_row: &RanklistRow, old_row: &RanklistRow,
new_row: &RanklistRow, new_row: &RanklistRow,
@ -228,7 +252,7 @@ fn translate_overall_result(
let mut m = MessageBuilder::new(); let mut m = MessageBuilder::new();
m.push_bold_safe(handle) m.push_bold_safe(handle)
.push(" (") .push(" (")
.push_safe(member.distinct()) .push(mention(phase, member))
.push(")"); .push(")");
m m
}; };
@ -260,6 +284,7 @@ fn translate_overall_result(
} }
fn translate_change( fn translate_change(
phase: ContestPhase,
handle: &str, handle: &str,
row: &RanklistRow, row: &RanklistRow,
member: &Member, member: &Member,
@ -270,7 +295,7 @@ fn translate_change(
let mut m = MessageBuilder::new(); let mut m = MessageBuilder::new();
m.push_bold_safe(handle) m.push_bold_safe(handle)
.push(" (") .push(" (")
.push_safe(member.distinct()) .push_safe(mention(phase, member))
.push(")"); .push(")");
use Change::*; use Change::*;