Codeforces: asyncify live

This commit is contained in:
Natsu Kagami 2020-09-07 22:48:01 -04:00
parent 1fc6d09910
commit 4a2a4ed688
Signed by: nki
GPG key ID: 73376E117CD20735
3 changed files with 57 additions and 62 deletions

13
Cargo.lock generated
View file

@ -235,15 +235,6 @@ dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-channel"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-deque"
version = "0.7.3"
@ -1933,14 +1924,13 @@ dependencies = [
"Inflector 0.11.4 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"codeforces 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"dashmap 3.11.4 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"reqwest 0.10.4 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.110 (registry+https://github.com/rust-lang/crates.io-index)",
"serenity 0.9.0-rc.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
"youmubot-db 0.1.0",
"youmubot-prelude 0.1.0",
]
@ -2037,7 +2027,6 @@ dependencies = [
"checksum core-foundation 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "57d24c7a13c43e870e37c1556b74555437870a04514f7685f5b354e090567171"
"checksum core-foundation-sys 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061"
"checksum crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
"checksum crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
"checksum crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db"

View file

@ -7,17 +7,15 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde = { version = "1", features = ["derive"] }
tokio = { version = "0.2", features = ["time"] }
reqwest = "0.10.1"
serenity = "0.9.0-rc.0"
Inflector = "0.11"
codeforces = "0.2"
regex = "1"
lazy_static = "1"
rayon = "1"
chrono = { version = "0.4", features = ["serde"] }
crossbeam-channel = "0.4"
dashmap = "3.11.4"
youmubot-prelude = { path = "../youmubot-prelude" }
youmubot-db = { path = "../youmubot-db" }

View file

@ -1,8 +1,6 @@
use crate::db::CfSavedUsers;
use crate::{db::CfSavedUsers, CFClient};
use codeforces::{Contest, ContestPhase, Problem, ProblemResult, ProblemResultType, RanklistRow};
use rayon::prelude::*;
use serenity::{
framework::standard::{CommandError, CommandResult},
model::{
guild::Member,
id::{ChannelId, GuildId, UserId},
@ -21,56 +19,64 @@ struct MemberResult {
/// Watch and commentate a contest.
///
/// Does the thing on a channel, block until the contest ends.
pub fn watch_contest(
ctx: &mut Context,
pub async fn watch_contest(
ctx: &Context,
guild: GuildId,
channel: ChannelId,
contest_id: u64,
) -> CommandResult {
let db = CfSavedUsers::open(&*ctx.data.read()).borrow()?.clone();
) -> Result<()> {
let data = ctx.data.read().await;
let db = CfSavedUsers::open(&*data).borrow()?.clone();
let http = ctx.http.clone();
// Collect an initial member list.
// This never changes during the scan.
let mut member_results: HashMap<UserId, MemberResult> = db
.into_par_iter()
.filter_map(|(user_id, cfu)| {
let member = guild.member(http.clone().as_ref(), user_id).ok();
match member {
Some(m) => Some((
user_id,
MemberResult {
member: m,
handle: cfu.handle,
row: None,
},
)),
None => None,
.into_iter()
.map(|(user_id, cfu)| {
let http = http.clone();
async move {
guild.member(http, user_id).await.map(|m| {
(
user_id,
MemberResult {
member: m,
handle: cfu.handle,
row: None,
},
)
})
}
})
.collect();
.collect::<stream::FuturesUnordered<_>>()
.filter_map(|v| future::ready(v.ok()))
.collect()
.await;
let http = ctx.data.get_cloned::<HTTPClient>();
let (mut contest, _, _) = Contest::standings(&http, contest_id, |f| f.limit(1, 1))?;
let http = data.get::<CFClient>().unwrap();
let (mut contest, _, _) = Contest::standings(&http, contest_id, |f| f.limit(1, 1)).await?;
channel.send_message(&ctx, |e| {
e.content(format!(
"Youmu is watching contest **{}**, with the following members:\n{}",
contest.name,
member_results
.iter()
.map(|(_, m)| format!("- {} as **{}**", m.member.distinct(), m.handle))
.collect::<Vec<_>>()
.join("\n"),
))
})?;
channel
.send_message(&ctx, |e| {
e.content(format!(
"Youmu is watching contest **{}**, with the following members:\n{}",
contest.name,
member_results
.iter()
.map(|(_, m)| format!("- {} as **{}**", m.member.distinct(), m.handle))
.collect::<Vec<_>>()
.join("\n"),
))
})
.await?;
loop {
if let Ok(messages) = scan_changes(http.clone(), &mut member_results, &mut contest) {
if let Ok(messages) = scan_changes(&*http, &mut member_results, &mut contest).await {
for message in messages {
channel
.send_message(&ctx, |e| {
e.content(format!("**{}**: {}", contest.name, message))
})
.await
.ok();
}
}
@ -78,7 +84,7 @@ pub fn watch_contest(
break;
}
// Sleep for a minute
std::thread::sleep(std::time::Duration::from_secs(60));
tokio::time::delay_for(std::time::Duration::from_secs(60)).await;
}
// Announce the final results
@ -93,12 +99,14 @@ pub fn watch_contest(
ranks.sort_by(|(_, a), (_, b)| a.rank.cmp(&b.rank));
if ranks.is_empty() {
channel.send_message(&ctx, |e| {
e.content(format!(
"**{}** has ended, but I can't find anyone in this server on the scoreboard...",
contest.name
))
})?;
channel
.send_message(&ctx, |e| {
e.content(format!(
"**{}** has ended, but I can't find anyone in this server on the scoreboard...",
contest.name
))
})
.await?;
return Ok(());
}
@ -115,23 +123,23 @@ pub fn watch_contest(
row.problem_results.iter().map(|p| format!("{:.0}", p.points)).collect::<Vec<_>>().join("/"),
row.successful_hack_count,
row.unsuccessful_hack_count,
)).collect::<Vec<_>>().join("\n"))))?;
)).collect::<Vec<_>>().join("\n")))).await?;
Ok(())
}
fn scan_changes(
http: <HTTPClient as TypeMapKey>::Value,
async fn scan_changes(
http: &codeforces::Client,
members: &mut HashMap<UserId, MemberResult>,
contest: &mut Contest,
) -> Result<Vec<String>, CommandError> {
) -> Result<Vec<String>> {
let mut messages: Vec<String> = vec![];
let (updated_contest, problems, ranks) = {
let handles = members
.iter()
.map(|(_, h)| h.handle.clone())
.collect::<Vec<_>>();
Contest::standings(&http, contest.id, |f| f.handles(handles))?
Contest::standings(&http, contest.id, |f| f.handles(handles)).await?
};
// Change of phase.
if contest.phase != updated_contest.phase {