Patches to get deadlocking in osu updates fixed

This commit is contained in:
Natsu Kagami 2023-11-09 01:36:58 +01:00
parent 1b02993e98
commit 220dbc21ea
Signed by: nki
GPG key ID: 55A032EB38B49ADB
7 changed files with 60 additions and 41 deletions

6
Cargo.lock generated
View file

@ -1698,13 +1698,13 @@ dependencies = [
[[package]] [[package]]
name = "serenity" name = "serenity"
version = "0.11.6" version = "0.11.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d007dc45584ecc47e791f2a9a7cf17bf98ac386728106f111159c846d624be3f" checksum = "7a7a89cef23483fc9d4caf2df41e6d3928e18aada84c56abd237439d929622c6"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"async-tungstenite", "async-tungstenite",
"base64 0.13.1", "base64 0.21.4",
"bitflags 1.3.2", "bitflags 1.3.2",
"bytes", "bytes",
"cfg-if", "cfg-if",

View file

@ -29,10 +29,8 @@ pub struct Announcer {
} }
impl Announcer { impl Announcer {
pub fn new(client: Osu) -> Self { pub fn new(client: Arc<Osu>) -> Self {
Self { Self { client }
client: Arc::new(client),
}
} }
} }
@ -45,10 +43,12 @@ impl youmubot_prelude::Announcer for Announcer {
channels: MemberToChannels, channels: MemberToChannels,
) -> Result<()> { ) -> Result<()> {
// For each user... // For each user...
let data = d.read().await; let users = {
let data = data.get::<OsuSavedUsers>().unwrap(); let data = d.read().await;
let data = data.get::<OsuSavedUsers>().unwrap();
data.all().await?
};
let now = chrono::Utc::now(); let now = chrono::Utc::now();
let users = data.all().await?;
users users
.into_iter() .into_iter()
.map(|mut osu_user| { .map(|mut osu_user| {
@ -58,7 +58,7 @@ impl youmubot_prelude::Announcer for Announcer {
c: c.clone(), c: c.clone(),
data: d.clone(), data: d.clone(),
}; };
let s = &self; let s = &*self;
async move { async move {
let channels = channels.channels_of(ctx.c.clone(), user_id).await; let channels = channels.channels_of(ctx.c.clone(), user_id).await;
if channels.is_empty() { if channels.is_empty() {
@ -76,7 +76,16 @@ impl youmubot_prelude::Announcer for Announcer {
Ok(v) => { Ok(v) => {
osu_user.last_update = now; osu_user.last_update = now;
osu_user.pp = v.try_into().unwrap(); osu_user.pp = v.try_into().unwrap();
data.save(osu_user).await.pls_ok(); let id = osu_user.id;
ctx.data
.read()
.await
.get::<OsuSavedUsers>()
.unwrap()
.save(osu_user)
.await
.pls_ok();
println!("updating {} done", id);
} }
Err(e) => { Err(e) => {
eprintln!("osu: Cannot update {}: {}", osu_user.id, e); eprintln!("osu: Cannot update {}: {}", osu_user.id, e);

View file

@ -71,25 +71,24 @@ pub fn setup(
// API client // API client
let http_client = data.get::<HTTPClient>().unwrap().clone(); let http_client = data.get::<HTTPClient>().unwrap().clone();
let make_client = || { let osu_client = Arc::new(OsuHttpClient::new(
OsuHttpClient::new( std::env::var("OSU_API_KEY").expect("Please set OSU_API_KEY as osu! api key."),
std::env::var("OSU_API_KEY").expect("Please set OSU_API_KEY as osu! api key."), http_client.clone(),
) ));
};
let osu_client = Arc::new(make_client());
data.insert::<OsuClient>(osu_client.clone()); data.insert::<OsuClient>(osu_client.clone());
data.insert::<oppai_cache::BeatmapCache>(oppai_cache::BeatmapCache::new( data.insert::<oppai_cache::BeatmapCache>(oppai_cache::BeatmapCache::new(
http_client, http_client,
sql_client.clone(), sql_client.clone(),
)); ));
data.insert::<beatmap_cache::BeatmapMetaCache>(beatmap_cache::BeatmapMetaCache::new( data.insert::<beatmap_cache::BeatmapMetaCache>(beatmap_cache::BeatmapMetaCache::new(
osu_client, sql_client, osu_client.clone(),
sql_client,
)); ));
// Announcer // Announcer
announcers.add( announcers.add(
announcer::ANNOUNCER_KEY, announcer::ANNOUNCER_KEY,
announcer::Announcer::new(make_client()), announcer::Announcer::new(osu_client),
); );
Ok(()) Ok(())
} }

View file

@ -33,9 +33,9 @@ fn vec_try_into<U, T: std::convert::TryFrom<U>>(v: Vec<U>) -> Result<Vec<T>, T::
impl Client { impl Client {
/// Create a new client from the given API key. /// Create a new client from the given API key.
pub fn new(key: String) -> Client { pub fn new(key: String, client: HTTPClient) -> Client {
let client = Ratelimit::new( let client = Ratelimit::new(
HTTPClient::new(), client,
REQUESTS_PER_MINUTE, REQUESTS_PER_MINUTE,
std::time::Duration::from_secs(60), std::time::Duration::from_secs(60),
); );

View file

@ -19,6 +19,7 @@ use serenity::{
CacheAndHttp, CacheAndHttp,
}; };
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use tokio::time::{interval, MissedTickBehavior};
use youmubot_db::DB; use youmubot_db::DB;
/// A list of assigned channels for an announcer. /// A list of assigned channels for an announcer.
@ -149,28 +150,32 @@ impl AnnouncerHandler {
/// Start the AnnouncerHandler, looping forever. /// Start the AnnouncerHandler, looping forever.
/// ///
/// It will run all the announcers in sequence every *cooldown* seconds. /// It will run all the announcers every *cooldown* seconds.
pub async fn scan(self, cooldown: std::time::Duration) { pub async fn scan(self, cooldown: std::time::Duration) {
// First we store all the keys inside the database. // First we store all the keys inside the database.
let keys = self.announcers.keys().cloned().collect::<Vec<_>>(); let keys = self.announcers.keys().cloned().collect::<Vec<_>>();
self.data.write().await.insert::<Self>(keys.clone()); self.data.write().await.insert::<Self>(keys.clone());
loop { join_all(self.announcers.iter().map(|(key, announcer)| {
eprintln!("{}: announcer started scanning", chrono::Utc::now()); let data = self.data.clone();
let after = tokio::time::sleep_until(tokio::time::Instant::now() + cooldown); let cache = self.cache_http.clone();
join_all(self.announcers.iter().map(|(key, announcer)| { let mut looper = interval(cooldown);
eprintln!(" - scanning key `{}`", key); looper.set_missed_tick_behavior(MissedTickBehavior::Delay);
Self::announce(self.data.clone(), self.cache_http.clone(), key, announcer).map( async move {
move |v| { loop {
if let Err(e) = v { eprintln!(" - scanning key `{}`", key);
match Self::announce(data.clone(), cache.clone(), key, announcer).await {
Err(e) => {
eprintln!(" - key `{}`: {:?}", *key, e) eprintln!(" - key `{}`: {:?}", *key, e)
} }
}, Ok(()) => {
) eprintln!(" - key `{}`: complete", *key)
})) }
.await; };
eprintln!("{}: announcer finished scanning", chrono::Utc::now()); looper.tick().await;
after.await; }
} }
}))
.await;
} }
} }

View file

@ -1,5 +1,5 @@
use serenity::prelude::*; use serenity::prelude::*;
use std::path::Path; use std::{path::Path, time::Duration};
/// Set up the prelude libraries. /// Set up the prelude libraries.
/// ///
@ -22,7 +22,13 @@ pub async fn setup_prelude(
.expect("SQL database set up"); .expect("SQL database set up");
// Set up the HTTP client. // Set up the HTTP client.
data.insert::<crate::HTTPClient>(reqwest::Client::new()); data.insert::<crate::HTTPClient>(
reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(60))
.build()
.expect("Build be able to build HTTP client"),
);
// Set up the member cache. // Set up the member cache.
data.insert::<crate::MemberCache>(std::sync::Arc::new(crate::MemberCache::default())); data.insert::<crate::MemberCache>(std::sync::Arc::new(crate::MemberCache::default()));

View file

@ -148,7 +148,7 @@ async fn main() {
#[cfg(feature = "codeforces")] #[cfg(feature = "codeforces")]
println!("codeforces enabled."); println!("codeforces enabled.");
tokio::spawn(announcers.scan(std::time::Duration::from_secs(120))); tokio::spawn(announcers.scan(std::time::Duration::from_secs(300)));
println!("Starting..."); println!("Starting...");
if let Err(v) = client.start().await { if let Err(v) = client.start().await {