Prelude: Announcer: full async rewrite

This commit is contained in:
Natsu Kagami 2020-09-02 19:32:46 -04:00
parent fe5cd8a58a
commit a578ce5924
No known key found for this signature in database
GPG key ID: F17543D4B9424B94
3 changed files with 96 additions and 89 deletions

2
Cargo.lock generated
View file

@ -1968,8 +1968,10 @@ name = "youmubot-prelude"
version = "0.1.0"
dependencies = [
"anyhow 1.0.32 (registry+https://github.com/rust-lang/crates.io-index)",
"async-trait 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"reqwest 0.10.4 (registry+https://github.com/rust-lang/crates.io-index)",
"serenity 0.9.0-rc.0 (registry+https://github.com/rust-lang/crates.io-index)",

View file

@ -9,6 +9,8 @@ edition = "2018"
[dependencies]
serenity = "0.9.0-rc.0"
anyhow = "1.0"
async-trait = "0.1"
futures-util = "0.3"
youmubot-db = { path = "../youmubot-db" }
crossbeam-channel = "0.4"
reqwest = "0.10"

View file

@ -1,10 +1,14 @@
use crate::{AppData, GetCloned};
use crate::{AppData, Result};
use async_trait::async_trait;
use crossbeam_channel::after;
use rayon::prelude::*;
use futures_util::{
future::{join_all, ready, FutureExt},
stream::{FuturesUnordered, StreamExt},
};
use serenity::{
framework::standard::{
macros::{command, group},
Args, CommandError as Error, CommandResult,
Args, CommandResult,
},
http::CacheHttp,
model::{
@ -15,11 +19,7 @@ use serenity::{
utils::MessageBuilder,
CacheAndHttp,
};
use std::{
collections::HashMap,
sync::Arc,
thread::{spawn, JoinHandle},
};
use std::{collections::HashMap, sync::Arc};
use youmubot_db::DB;
/// A list of assigned channels for an announcer.
@ -33,30 +33,17 @@ pub(crate) type AnnouncerChannels = DB<HashMap<String, HashMap<GuildId, ChannelI
/// - An AppData, which can be used for interacting with internal databases.
/// - A function "channels", which takes an UserId and returns the list of ChannelIds, which any update related to that user should be
/// sent to.
#[async_trait]
pub trait Announcer: Send {
/// Look for updates and send them to respective channels.
///
/// Errors returned from this function gets ignored and logged down.
fn updates(
async fn updates(
&mut self,
c: Arc<CacheAndHttp>,
d: AppData,
channels: MemberToChannels,
) -> CommandResult;
}
impl<T> Announcer for T
where
T: FnMut(Arc<CacheAndHttp>, AppData, MemberToChannels) -> CommandResult + Send,
{
fn updates(
&mut self,
c: Arc<CacheAndHttp>,
d: AppData,
channels: MemberToChannels,
) -> CommandResult {
self(c, d, channels)
}
) -> Result<()>;
}
/// A simple struct that allows looking up the relevant channels to an user.
@ -64,18 +51,24 @@ pub struct MemberToChannels(Vec<(GuildId, ChannelId)>);
impl MemberToChannels {
/// Gets the channel list of an user related to that channel.
pub fn channels_of(
pub async fn channels_of(
&self,
http: impl CacheHttp + Clone + Sync,
u: impl Into<UserId>,
) -> Vec<ChannelId> {
let u = u.into();
let u: UserId = u.into();
self.0
.par_iter()
.filter_map(|(guild, channel)| {
guild.member(http.clone(), u).ok().map(|_| channel.clone())
.clone()
.into_iter()
.map(|(guild, channel): (GuildId, ChannelId)| {
guild
.member(http.clone(), u)
.map(move |v| v.ok().map(|_| channel.clone()))
})
.collect::<Vec<_>>()
.collect::<FuturesUnordered<_>>()
.filter_map(|v| ready(v))
.collect()
.await
}
}
@ -85,7 +78,7 @@ impl MemberToChannels {
pub struct AnnouncerHandler {
cache_http: Arc<CacheAndHttp>,
data: AppData,
announcers: HashMap<&'static str, Box<dyn Announcer>>,
announcers: HashMap<&'static str, RwLock<Box<dyn Announcer>>>,
}
// Querying for the AnnouncerHandler in the internal data returns a vec of keys.
@ -108,7 +101,10 @@ impl AnnouncerHandler {
///
/// The handler must take an unique key. If a duplicate is found, this method panics.
pub fn add(&mut self, key: &'static str, announcer: impl Announcer + 'static) -> &mut Self {
if let Some(_) = self.announcers.insert(key, Box::new(announcer)) {
if let Some(_) = self
.announcers
.insert(key, RwLock::new(Box::new(announcer)))
{
panic!(
"Announcer keys must be unique: another announcer with key `{}` was found",
key
@ -122,9 +118,8 @@ impl AnnouncerHandler {
/// Execution-related.
impl AnnouncerHandler {
/// Collect the list of guilds and their respective channels, by the key of the announcer.
fn get_guilds(&self, key: &'static str) -> Result<Vec<(GuildId, ChannelId)>, Error> {
let d = &self.data;
let data = AnnouncerChannels::open(&*d.read())
async fn get_guilds(data: &AppData, key: &'static str) -> Result<Vec<(GuildId, ChannelId)>> {
let data = AnnouncerChannels::open(&*data.read().await)
.borrow()?
.get(key)
.map(|m| m.iter().map(|(a, b)| (*a, *b)).collect())
@ -133,48 +128,54 @@ impl AnnouncerHandler {
}
/// Run the announcing sequence on a certain announcer.
fn announce(&mut self, key: &'static str) -> CommandResult {
let guilds: Vec<_> = self.get_guilds(key)?;
let channels = MemberToChannels(guilds);
let cache_http = self.cache_http.clone();
let data = self.data.clone();
let announcer = self
.announcers
.get_mut(&key)
.expect("Key is from announcers");
announcer.updates(cache_http, data, channels)?;
Ok(())
async fn announce(
data: AppData,
cache_http: Arc<CacheAndHttp>,
key: &'static str,
announcer: &'_ RwLock<Box<dyn Announcer>>,
) -> Result<()> {
let channels = MemberToChannels(Self::get_guilds(&data, key).await?);
announcer
.write()
.await
.updates(cache_http, data, channels)
.await
}
/// Start the AnnouncerHandler, moving it into another thread.
/// Start the AnnouncerHandler, looping forever.
///
/// It will run all the announcers in sequence every *cooldown* seconds.
pub fn scan(mut self, cooldown: std::time::Duration) -> JoinHandle<()> {
pub async fn scan(self, cooldown: std::time::Duration) -> () {
// First we store all the keys inside the database.
let keys = self.announcers.keys().cloned().collect::<Vec<_>>();
self.data.write().insert::<Self>(keys.clone());
spawn(move || loop {
self.data.write().await.insert::<Self>(keys.clone());
loop {
eprintln!("{}: announcer started scanning", chrono::Utc::now());
let after_timer = after(cooldown);
for key in &keys {
join_all(self.announcers.iter().map(|(key, announcer)| {
eprintln!(" - scanning key `{}`", key);
if let Err(e) = self.announce(key) {
dbg!(e);
}
}
Self::announce(self.data.clone(), self.cache_http.clone(), *key, announcer).map(
move |v| {
if let Err(e) = v {
eprintln!(" - key `{}`: {:?}", *key, e)
}
},
)
}))
.await;
eprintln!("{}: announcer finished scanning", chrono::Utc::now());
after_timer.recv().ok();
})
}
}
}
/// Gets the announcer of the given guild.
pub fn announcer_of(
pub async fn announcer_of(
ctx: &Context,
key: &'static str,
guild: GuildId,
) -> Result<Option<ChannelId>, Error> {
Ok(AnnouncerChannels::open(&*ctx.data.read())
) -> Result<Option<ChannelId>> {
Ok(AnnouncerChannels::open(&*ctx.data.read().await)
.borrow()?
.get(key)
.and_then(|channels| channels.get(&guild).cloned()))
@ -184,20 +185,19 @@ pub fn announcer_of(
#[description = "List the registered announcers of this server"]
#[num_args(0)]
#[only_in(guilds)]
pub fn list_announcers(ctx: &mut Context, m: &Message, _: Args) -> CommandResult {
pub async fn list_announcers(ctx: &Context, m: &Message, _: Args) -> CommandResult {
let guild_id = m.guild_id.unwrap();
let announcers = AnnouncerChannels::open(&*ctx.data.read());
let announcers = announcers.borrow()?;
let channels = ctx
.data
.get_cloned::<AnnouncerHandler>()
.into_iter()
.filter_map(|key| {
announcers
.get(key)
.and_then(|channels| channels.get(&guild_id))
.map(|&ch| (key, ch))
let announcers = AnnouncerChannels::open(&*ctx.data.read().await);
let channels = ctx.data.read().await.get::<AnnouncerHandler>().unwrap();
let channels = channels
.iter()
.filter_map(|&key| {
announcers.borrow().ok().and_then(|announcers| {
announcers
.get(key)
.and_then(|channels| channels.get(&guild_id))
.map(|&ch| (key, ch))
})
})
.map(|(key, ch)| format!(" - `{}`: activated on channel {}", key, ch.mention()))
.collect::<Vec<_>>();
@ -208,7 +208,8 @@ pub fn list_announcers(ctx: &mut Context, m: &Message, _: Args) -> CommandResult
"Activated announcers on this server:\n{}",
channels.join("\n")
),
)?;
)
.await?;
Ok(())
}
@ -219,23 +220,23 @@ pub fn list_announcers(ctx: &mut Context, m: &Message, _: Args) -> CommandResult
#[required_permissions(MANAGE_CHANNELS)]
#[only_in(guilds)]
#[num_args(1)]
pub fn register_announcer(ctx: &mut Context, m: &Message, mut args: Args) -> CommandResult {
pub async fn register_announcer(ctx: &Context, m: &Message, mut args: Args) -> CommandResult {
let key = args.single::<String>()?;
let keys = ctx.data.get_cloned::<AnnouncerHandler>();
if !keys.contains(&key.as_str()) {
let keys = ctx.data.read().await.get::<AnnouncerHandler>().unwrap();
if !keys.contains(&&key[..]) {
m.reply(
&ctx,
format!(
"Key not found. Available announcer keys are: `{}`",
keys.join(", ")
),
)?;
)
.await?;
return Ok(());
}
let guild = m.guild(&ctx).expect("Guild-only command");
let guild = guild.read();
let channel = m.channel_id.to_channel(&ctx)?;
AnnouncerChannels::open(&*ctx.data.read())
let guild = m.guild(&ctx).await.expect("Guild-only command");
let channel = m.channel_id.to_channel(&ctx).await?;
AnnouncerChannels::open(&*ctx.data.read().await)
.borrow_mut()?
.entry(key.clone())
.or_default()
@ -250,7 +251,8 @@ pub fn register_announcer(ctx: &mut Context, m: &Message, mut args: Args) -> Com
.push(" on channel ")
.push_bold_safe(channel)
.build(),
)?;
)
.await?;
Ok(())
}
@ -260,9 +262,9 @@ pub fn register_announcer(ctx: &mut Context, m: &Message, mut args: Args) -> Com
#[required_permissions(MANAGE_CHANNELS)]
#[only_in(guilds)]
#[num_args(1)]
pub fn remove_announcer(ctx: &mut Context, m: &Message, mut args: Args) -> CommandResult {
pub async fn remove_announcer(ctx: &Context, m: &Message, mut args: Args) -> CommandResult {
let key = args.single::<String>()?;
let keys = ctx.data.get_cloned::<AnnouncerHandler>();
let keys = ctx.data.read().await.get::<AnnouncerHandler>().unwrap();
if !keys.contains(&key.as_str()) {
m.reply(
&ctx,
@ -270,12 +272,12 @@ pub fn remove_announcer(ctx: &mut Context, m: &Message, mut args: Args) -> Comma
"Key not found. Available announcer keys are: `{}`",
keys.join(", ")
),
)?;
)
.await?;
return Ok(());
}
let guild = m.guild(&ctx).expect("Guild-only command");
let guild = guild.read();
AnnouncerChannels::open(&*ctx.data.read())
let guild = m.guild(&ctx).await.expect("Guild-only command");
AnnouncerChannels::open(&*ctx.data.read().await)
.borrow_mut()?
.entry(key.clone())
.and_modify(|m| {
@ -289,7 +291,8 @@ pub fn remove_announcer(ctx: &mut Context, m: &Message, mut args: Args) -> Comma
.push(" has been de-activated for server ")
.push_bold_safe(&guild.name)
.build(),
)?;
)
.await?;
Ok(())
}