Rework how fetching recent activity works

User requests will be faster since it no longer concerns recent activity.
Announcer will also be faster: we only fetch recent activity once.
This commit is contained in:
Natsu Kagami 2025-05-20 19:14:10 +02:00
parent 8bb8b4114a
commit 98278de2f3
Signed by: nki
GPG key ID: 55A032EB38B49ADB
10 changed files with 191 additions and 157 deletions

View file

@ -1,8 +1,7 @@
use chrono::{DateTime, Utc};
use future::Future;
use futures_util::try_join;
use serenity::all::Member;
use std::pin::Pin;
use std::collections::HashMap;
use std::sync::Arc;
use stream::FuturesUnordered;
@ -17,12 +16,12 @@ use serenity::{
use announcer::MemberToChannels;
use youmubot_prelude::announcer::CacheAndHttp;
use youmubot_prelude::stream::TryStreamExt;
use youmubot_prelude::*;
use crate::discord::calculate_weighted_map_age;
use crate::discord::db::OsuUserMode;
use crate::scores::Scores;
use crate::models::UserHeader;
use crate::scores::LazyBuffer;
use crate::{
discord::cache::save_beatmap,
discord::oppai_cache::BeatmapContent,
@ -89,11 +88,16 @@ impl Announcer {
return;
}
const MODES: [Mode; 4] = [Mode::Std, Mode::Taiko, Mode::Catch, Mode::Mania];
let last_update = user
.modes
.iter()
.map(|(k, v)| (*k, v.last_update))
.collect::<HashMap<_, _>>();
let now = chrono::Utc::now();
let broadcast_to = Arc::new(broadcast_to);
let mut to_announce = Vec::<Pin<Box<dyn Future<Output = ()> + Send>>>::new();
let mut to_announce = Vec::<CollectedScore>::new();
for mode in MODES {
let (u, top, events) = match self.fetch_user_data(env, now, &user, mode).await {
let (u, top) = match self.fetch_user_data(env, &user, mode).await {
Ok(v) => v,
Err(err) => {
eprintln!(
@ -130,41 +134,56 @@ impl Announcer {
let last = user.modes.insert(mode, stats);
// broadcast
let mention = user.user_id;
let broadcast_to = broadcast_to.clone();
let ctx = ctx.clone();
let env = env.clone();
if let Some(last) = last {
to_announce.push(Box::pin(async move {
let top = top
.into_iter()
to_announce.extend(
top.into_iter()
.enumerate()
.filter(|(_, s)| Self::is_announceable_date(s.date, last.last_update, now))
.map(|(rank, score)| {
CollectedScore::from_top_score(&u, score, mode, rank as u8 + 1)
});
let recents = events
.into_iter()
.map(|e| CollectedScore::from_event(&env.client, &u, e))
.collect::<FuturesUnordered<_>>()
.filter_map(|v| future::ready(v.pls_ok()))
.collect::<Vec<_>>()
.await
.into_iter();
CollectedScore::merge(top.chain(recents))
.map(|v| v.send_message(&ctx, &env, mention, &broadcast_to))
.collect::<FuturesUnordered<_>>()
.filter_map(|v| future::ready(v.pls_ok().map(|_| ())))
.collect::<()>()
.await
}));
}),
);
}
}
if let Some(recents) = env
.client
.user_events(UserID::ID(user.id))
.and_then(|v| v.get_all())
.await
.pls_ok()
{
if let Some(header) = env.client.user_header(user.id).await.pls_ok().flatten() {
let recents = recents
.into_iter()
.filter_map(|v| v.to_event_rank())
.filter(|s| Self::is_worth_announcing(s))
.filter(|s| {
let lu = last_update.get(&s.mode).cloned();
let f = Self::is_announceable_date(s.date, lu, now);
f
})
.map(|e| CollectedScore::from_event(&env.client, header.clone(), e))
.collect::<FuturesUnordered<_>>()
.filter_map(|v| future::ready(v.pls_ok()))
.collect::<Vec<_>>()
.await;
to_announce =
CollectedScore::merge(to_announce.into_iter().chain(recents)).collect();
}
}
user.failures = 0;
let user_id = user.user_id;
if let Some(true) = env.saved_users.save(user).await.pls_ok() {
to_announce.into_iter().for_each(|v| {
spawn_future(v);
let env = env.clone();
let ctx = ctx.clone();
let broadcast_to = broadcast_to.clone();
spawn_future(async move {
for v in to_announce.into_iter() {
v.send_message(&ctx, &env, user_id, &broadcast_to)
.await
.pls_ok();
}
});
} else {
eprintln!("[osu] Skipping user {} due to raced update", user_id)
@ -194,48 +213,31 @@ impl Announcer {
async fn fetch_user_data(
&self,
env: &OsuEnv,
now: chrono::DateTime<chrono::Utc>,
osu_user: &OsuUser,
mode: Mode,
) -> Result<(User, Vec<Score>, Vec<UserEventRank>), Error> {
let stats = osu_user.modes.get(&mode).cloned();
let last_update = stats.as_ref().map(|v| v.last_update);
) -> Result<(User, Vec<Score>), Error> {
let user_id = UserID::ID(osu_user.id);
let user = {
let days_since_last_update = stats
.as_ref()
.map(|v| (now - v.last_update).num_days() + 1)
.unwrap_or(30);
env.client.user(&user_id, move |f| {
f.mode(mode)
.event_days(days_since_last_update.min(31) as u8)
})
};
let user = env.client.user(&user_id, move |f| f.mode(mode));
let top_scores = env
.client
.user_best(user_id.clone(), move |f| f.mode(mode))
.and_then(|v| v.get_all());
let (user, top_scores) = try_join!(user, top_scores)?;
let mut user = user.unwrap();
// if top scores exist, user would too
let events = std::mem::take(&mut user.events)
.into_iter()
.filter_map(|v| v.to_event_rank())
.filter(|s| s.mode == mode && Self::is_worth_announcing(s))
.filter(|s| Self::is_announceable_date(s.date, last_update, now))
.collect::<Vec<_>>();
Ok((user, top_scores, events))
let Some(user) = user else {
return Err(error!("user not found"));
};
Ok((user, top_scores))
}
}
struct CollectedScore<'a> {
pub user: &'a User,
struct CollectedScore {
pub user: UserHeader,
pub score: Score,
pub mode: Mode,
pub kind: ScoreType,
}
impl<'a> CollectedScore<'a> {
impl CollectedScore {
fn merge(scores: impl IntoIterator<Item = Self>) -> impl Iterator<Item = Self> {
let mut mp = std::collections::HashMap::<u64, Self>::new();
scores
@ -251,9 +253,9 @@ impl<'a> CollectedScore<'a> {
mp.into_values()
}
fn from_top_score(user: &'a User, score: Score, mode: Mode, rank: u8) -> Self {
fn from_top_score(user: impl Into<UserHeader>, score: Score, mode: Mode, rank: u8) -> Self {
Self {
user,
user: user.into(),
score,
mode,
kind: ScoreType::top(rank),
@ -262,12 +264,14 @@ impl<'a> CollectedScore<'a> {
async fn from_event(
osu: &Osu,
user: &'a User,
user: impl Into<UserHeader>,
event: UserEventRank,
) -> Result<CollectedScore<'a>> {
) -> Result<CollectedScore> {
let user = user.into();
let user_id = user.id;
let mut scores = osu
.scores(event.beatmap_id, |f| {
f.user(UserID::ID(user.id)).mode(event.mode)
f.user(UserID::ID(user_id)).mode(event.mode)
})
.await?;
let score = match scores
@ -291,7 +295,7 @@ impl<'a> CollectedScore<'a> {
}
}
impl CollectedScore<'_> {
impl CollectedScore {
async fn send_message(
self,
ctx: impl CacheHttp,
@ -300,12 +304,13 @@ impl CollectedScore<'_> {
channels: &[ChannelId],
) -> Result<Vec<Message>> {
let (bm, content) = self.get_beatmap(env).await?;
channels
Ok(channels
.iter()
.map(|c| self.send_message_to(mention, *c, &ctx, env, &bm, &content))
.collect::<stream::FuturesUnordered<_>>()
.try_collect()
.await
.filter_map(|v| future::ready(v.pls_ok()))
.collect::<Vec<_>>()
.await)
}
async fn get_beatmap(&self, env: &OsuEnv) -> Result<(BeatmapWithMode, BeatmapContent)> {
@ -347,7 +352,7 @@ impl CollectedScore<'_> {
CreateMessage::new()
.content(self.kind.announcement_msg(self.mode, &member))
.embed({
let b = score_embed(&self.score, bm, content, self.user);
let b = score_embed(&self.score, bm, content, self.user.clone());
let b = if let Some(rank) = self.kind.top_record {
b.top_record(rank)
} else {

View file

@ -254,7 +254,7 @@ pub async fn forcesave<U: HasOsuEnv>(
async fn handle_listing<U: HasOsuEnv>(
ctx: CmdContext<'_, U>,
mut plays: impl Scores,
mut plays: impl LazyBuffer<Score>,
listing_args: ListingArgs,
transform: impl for<'a> Fn(u8, ScoreEmbedBuilder<'a>) -> ScoreEmbedBuilder<'a>,
listing_kind: &'static str,

View file

@ -7,7 +7,7 @@ mod scores {
use youmubot_prelude::*;
use crate::scores::Scores;
use crate::{models::Score, scores::LazyBuffer};
#[derive(Debug, Clone, Copy, PartialEq, Eq, ChoiceParameter)]
/// The style for the scores list to be displayed.
@ -41,7 +41,7 @@ mod scores {
impl ScoreListStyle {
pub async fn display_scores(
self,
scores: impl Scores,
scores: impl LazyBuffer<Score>,
ctx: &Context,
guild_id: Option<GuildId>,
m: impl CanEdit,
@ -62,10 +62,11 @@ mod scores {
use crate::discord::interaction::score_components;
use crate::discord::{cache::save_beatmap, BeatmapWithMode, OsuEnv};
use crate::scores::Scores;
use crate::models::Score;
use crate::scores::LazyBuffer;
pub async fn display_scores_grid(
scores: impl Scores,
scores: impl LazyBuffer<Score>,
ctx: &Context,
guild_id: Option<GuildId>,
mut on: impl CanEdit,
@ -93,14 +94,14 @@ mod scores {
Ok(())
}
pub struct Paginate<T: Scores> {
pub struct Paginate<T: LazyBuffer<Score>> {
env: OsuEnv,
scores: T,
guild_id: Option<GuildId>,
channel_id: serenity::all::ChannelId,
}
impl<T: Scores> Paginate<T> {
impl<T: LazyBuffer<Score>> Paginate<T> {
fn pages_fake(&self) -> usize {
let size = self.scores.length_fetched();
size.count() + if size.is_total() { 0 } else { 1 }
@ -108,7 +109,7 @@ mod scores {
}
#[async_trait]
impl<T: Scores> pagination::Paginate for Paginate<T> {
impl<T: LazyBuffer<Score>> pagination::Paginate for Paginate<T> {
async fn render(
&mut self,
page: u8,
@ -177,10 +178,11 @@ mod scores {
use crate::discord::oppai_cache::Stats;
use crate::discord::{time_before_now, Beatmap, BeatmapInfo, OsuEnv};
use crate::scores::Scores;
use crate::models::Score;
use crate::scores::LazyBuffer;
pub async fn display_scores_as_file(
scores: impl Scores,
scores: impl LazyBuffer<Score>,
ctx: &Context,
mut on: impl CanEdit,
) -> Result<()> {
@ -209,7 +211,7 @@ mod scores {
}
pub async fn display_scores_table(
scores: impl Scores,
scores: impl LazyBuffer<Score>,
ctx: &Context,
mut on: impl CanEdit,
) -> Result<()> {
@ -233,13 +235,13 @@ mod scores {
Ok(())
}
pub struct Paginate<T: Scores> {
pub struct Paginate<T: LazyBuffer<Score>> {
env: OsuEnv,
header: String,
scores: T,
}
impl<T: Scores> Paginate<T> {
impl<T: LazyBuffer<Score>> Paginate<T> {
async fn format_table(&mut self, start: usize, end: usize) -> Result<Option<String>> {
let scores = self.scores.get_range(start..end).await?;
if scores.is_empty() {
@ -364,7 +366,7 @@ mod scores {
const ITEMS_PER_PAGE: usize = 5;
#[async_trait]
impl<T: Scores> pagination::Paginate for Paginate<T> {
impl<T: LazyBuffer<Score>> pagination::Paginate for Paginate<T> {
async fn render(
&mut self,
page: u8,

View file

@ -36,7 +36,7 @@ use crate::{
models::{Beatmap, Mode, Mods, Score, User},
mods::UnparsedMods,
request::{BeatmapRequestKind, UserID},
scores::Scores,
scores::LazyBuffer,
OsuClient as OsuHttpClient, UserHeader, MAX_TOP_SCORES_INDEX,
};

View file

@ -30,7 +30,7 @@ use crate::{
},
models::Mode,
request::UserID,
scores::Scores,
scores::LazyBuffer,
Beatmap, Score,
};

View file

@ -5,6 +5,7 @@ use std::sync::Arc;
use futures_util::lock::Mutex;
use models::*;
use request::builders::*;
use request::scores::Fetch;
use request::*;
use youmubot_prelude::*;
@ -73,6 +74,13 @@ impl OsuClient {
Ok(u)
}
/// Fetch user events for an user.
pub async fn user_events(&self, user: UserID) -> Result<impl LazyBuffer<UserEvent>> {
request::UserEventRequest { user }
.make_buffer(self.clone())
.await
}
/// Fetch the user header.
pub async fn user_header(&self, id: u64) -> Result<Option<UserHeader>, Error> {
Ok({
@ -88,7 +96,7 @@ impl OsuClient {
&self,
beatmap_id: u64,
f: impl FnOnce(&mut ScoreRequestBuilder) -> &mut ScoreRequestBuilder,
) -> Result<impl Scores> {
) -> Result<impl LazyBuffer<Score>> {
let mut r = ScoreRequestBuilder::new(beatmap_id);
f(&mut r);
r.build(self).await
@ -98,7 +106,7 @@ impl OsuClient {
&self,
user: UserID,
f: impl FnOnce(&mut UserScoreRequestBuilder) -> &mut UserScoreRequestBuilder,
) -> Result<impl Scores> {
) -> Result<impl LazyBuffer<Score>> {
self.user_scores(UserScoreType::Best, user, f).await
}
@ -106,7 +114,7 @@ impl OsuClient {
&self,
user: UserID,
f: impl FnOnce(&mut UserScoreRequestBuilder) -> &mut UserScoreRequestBuilder,
) -> Result<impl Scores> {
) -> Result<impl LazyBuffer<Score>> {
self.user_scores(UserScoreType::Recent, user, f).await
}
@ -114,7 +122,7 @@ impl OsuClient {
&self,
user: UserID,
f: impl FnOnce(&mut UserScoreRequestBuilder) -> &mut UserScoreRequestBuilder,
) -> Result<impl Scores> {
) -> Result<impl LazyBuffer<Score>> {
self.user_scores(UserScoreType::Pin, user, f).await
}
@ -123,7 +131,7 @@ impl OsuClient {
u: UserScoreType,
user: UserID,
f: impl FnOnce(&mut UserScoreRequestBuilder) -> &mut UserScoreRequestBuilder,
) -> Result<impl Scores> {
) -> Result<impl LazyBuffer<Score>> {
let mut r = UserScoreRequestBuilder::new(u, user);
f(&mut r);
r.build(self.clone()).await

View file

@ -577,7 +577,6 @@ pub struct User {
pub count_s: u64,
pub count_sh: u64,
pub count_a: u64,
pub events: Vec<UserEvent>,
// Rankings
pub rank: u64,
pub country_rank: u64,

View file

@ -73,7 +73,6 @@ impl User {
pub(crate) fn from_rosu(
user: rosu::user::UserExtended,
stats: rosu::user::UserStatistics,
events: Vec<rosu::event::Event>,
) -> Self {
Self {
id: user.user_id as u64,
@ -93,7 +92,6 @@ impl User {
count_s: stats.grade_counts.s as u64,
count_sh: stats.grade_counts.sh as u64,
count_a: stats.grade_counts.a as u64,
events: events.into_iter().map(UserEvent::from).collect(),
rank: stats.global_rank.unwrap_or(0) as u64,
country_rank: stats.country_rank.unwrap_or(0) as u64,
level: stats.level.current as f64 + stats.level.progress as f64 / 100.0,

View file

@ -1,14 +1,15 @@
use core::fmt;
use std::sync::Arc;
use crate::models::{Mode, Mods};
use crate::models::{Mode, Mods, UserEvent};
use crate::OsuClient;
use rosu_v2::error::OsuError;
use scores::Fetch;
use youmubot_prelude::*;
pub(crate) mod scores;
pub use scores::Scores;
pub use scores::LazyBuffer;
#[derive(Clone, Debug)]
pub enum UserID {
@ -63,7 +64,7 @@ pub mod builders {
use crate::models::{self, Score};
use super::scores::{FetchScores, ScoresFetcher};
use super::scores::Fetch;
use super::OsuClient;
use super::*;
/// A builder for a Beatmap request.
@ -126,16 +127,11 @@ pub mod builders {
pub struct UserRequestBuilder {
user: UserID,
mode: Option<Mode>,
event_days: Option<u8>,
}
impl UserRequestBuilder {
pub(crate) fn new(user: UserID) -> Self {
UserRequestBuilder {
user,
mode: None,
event_days: None,
}
UserRequestBuilder { user, mode: None }
}
pub fn mode(&mut self, mode: impl Into<Option<Mode>>) -> &mut Self {
@ -143,11 +139,6 @@ pub mod builders {
self
}
pub fn event_days(&mut self, event_days: u8) -> &mut Self {
self.event_days = Some(event_days).filter(|&v| v <= 31).or(self.event_days);
self
}
pub(crate) async fn build(self, client: &OsuClient) -> Result<Option<models::User>> {
let mut r = client.rosu.user(self.user);
if let Some(mode) = self.mode {
@ -157,13 +148,8 @@ pub mod builders {
Some(v) => v,
None => return Ok(None),
};
let now = time::OffsetDateTime::now_utc()
- time::Duration::DAY * self.event_days.unwrap_or(31);
let mut events = handle_not_found(client.rosu.recent_activity(user.user_id).await)?
.unwrap_or(vec![]);
events.retain(|e: &rosu_v2::model::event::Event| (now <= e.created_at));
let stats = user.statistics.take().unwrap();
Ok(Some(models::User::from_rosu(user, stats, events)))
Ok(Some(models::User::from_rosu(user, stats)))
}
}
@ -238,7 +224,7 @@ pub mod builders {
Ok(scores.into_iter().map(|v| v.into()).collect())
}
pub(crate) async fn build(self, osu: &OsuClient) -> Result<impl Scores> {
pub(crate) async fn build(self, osu: &OsuClient) -> Result<impl LazyBuffer<Score>> {
// user queries always return all scores, so no need to consider offset.
// otherwise, it's not working anyway...
self.fetch_scores(osu, 0).await
@ -303,21 +289,18 @@ pub mod builders {
Ok(scores.into_iter().map(|v| v.into()).collect())
}
pub(crate) async fn build(self, client: OsuClient) -> Result<impl Scores> {
ScoresFetcher::new(client, self).await
pub(crate) async fn build(self, client: OsuClient) -> Result<impl LazyBuffer<Score>> {
self.make_buffer(client).await
}
}
impl FetchScores for UserScoreRequestBuilder {
async fn fetch_scores(
&self,
client: &crate::OsuClient,
offset: usize,
) -> Result<Vec<Score>> {
impl Fetch for UserScoreRequestBuilder {
type Item = Score;
async fn fetch(&self, client: &crate::OsuClient, offset: usize) -> Result<Vec<Score>> {
self.with_offset(client, offset).await
}
const SCORES_PER_PAGE: usize = Self::SCORES_PER_PAGE;
const ITEMS_PER_PAGE: usize = Self::SCORES_PER_PAGE;
}
}
@ -329,3 +312,27 @@ pub struct UserRecentRequest {
pub user: UserID,
pub mode: Option<Mode>,
}
pub struct UserEventRequest {
pub user: UserID,
}
impl Fetch for UserEventRequest {
type Item = UserEvent;
const ITEMS_PER_PAGE: usize = 50;
async fn fetch(&self, client: &crate::OsuClient, offset: usize) -> Result<Vec<Self::Item>> {
Ok(handle_not_found(
client
.rosu
.recent_activity(self.user.clone())
.limit(Self::ITEMS_PER_PAGE)
.offset(offset)
.await,
)?
.ok_or_else(|| error!("user not found"))?
.into_iter()
.map(Into::into)
.collect())
}
}

View file

@ -2,21 +2,33 @@ use std::{fmt::Display, future::Future, ops::Range};
use youmubot_prelude::*;
use crate::{models::Score, OsuClient};
use crate::OsuClient;
pub const MAX_SCORE_PER_PAGE: usize = 1000;
/// Fetch scores given an offset.
/// Implemented for score requests.
pub trait FetchScores: Send {
pub trait Fetch: Send {
type Item: Send + Sync + 'static;
/// Scores per page.
const SCORES_PER_PAGE: usize = MAX_SCORE_PER_PAGE;
/// Fetch scores given an offset.
fn fetch_scores(
const ITEMS_PER_PAGE: usize = MAX_SCORE_PER_PAGE;
/// Fetch items given an offset.
fn fetch(
&self,
client: &crate::OsuClient,
offset: usize,
) -> impl Future<Output = Result<Vec<Score>>> + Send;
) -> impl Future<Output = Result<Vec<Self::Item>>> + Send;
/// Create a buffer from the given Fetch implementation.
fn make_buffer(
self,
client: crate::OsuClient,
) -> impl Future<Output = Result<impl LazyBuffer<Self::Item>>> + Send
where
Self: Sized,
{
Fetcher::new(client, self)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -61,7 +73,7 @@ impl Size {
}
/// A scores stream.
pub trait Scores: Send {
pub trait LazyBuffer<T: Send + Sync + 'static>: Send {
/// Total length of the pages.
fn length_fetched(&self) -> Size;
@ -69,22 +81,22 @@ pub trait Scores: Send {
fn is_empty(&self) -> bool;
/// Get the index-th score.
fn get(&mut self, index: usize) -> impl Future<Output = Result<Option<&Score>>> + Send;
fn get(&mut self, index: usize) -> impl Future<Output = Result<Option<&T>>> + Send;
/// Get all scores.
fn get_all(self) -> impl Future<Output = Result<Vec<Score>>> + Send;
fn get_all(self) -> impl Future<Output = Result<Vec<T>>> + Send;
/// Get the scores between the given range.
fn get_range(&mut self, range: Range<usize>) -> impl Future<Output = Result<&[Score]>> + Send;
fn get_range(&mut self, range: Range<usize>) -> impl Future<Output = Result<&[T]>> + Send;
/// Find a score that matches the predicate `f`.
fn find<F: FnMut(&Score) -> bool + Send>(
fn find<F: FnMut(&T) -> bool + Send>(
&mut self,
f: F,
) -> impl Future<Output = Result<Option<&Score>>> + Send;
) -> impl Future<Output = Result<Option<&T>>> + Send;
}
impl Scores for Vec<Score> {
impl<T: Send + Sync + 'static> LazyBuffer<T> for Vec<T> {
fn length_fetched(&self) -> Size {
Size::Total(self.len())
}
@ -93,19 +105,19 @@ impl Scores for Vec<Score> {
self.is_empty()
}
fn get(&mut self, index: usize) -> impl Future<Output = Result<Option<&Score>>> + Send {
fn get(&mut self, index: usize) -> impl Future<Output = Result<Option<&T>>> + Send {
future::ok(self[..].get(index))
}
fn get_all(self) -> impl Future<Output = Result<Vec<Score>>> + Send {
fn get_all(self) -> impl Future<Output = Result<Vec<T>>> + Send {
future::ok(self)
}
fn get_range(&mut self, range: Range<usize>) -> impl Future<Output = Result<&[Score]>> + Send {
fn get_range(&mut self, range: Range<usize>) -> impl Future<Output = Result<&[T]>> + Send {
future::ok(&self[fit_range_to_len(self.len(), range)])
}
async fn find<F: FnMut(&Score) -> bool + Send>(&mut self, mut f: F) -> Result<Option<&Score>> {
async fn find<F: FnMut(&T) -> bool + Send>(&mut self, mut f: F) -> Result<Option<&T>> {
Ok(self.iter().find(|v| f(v)))
}
}
@ -116,20 +128,20 @@ fn fit_range_to_len(len: usize, range: Range<usize>) -> Range<usize> {
}
/// A scores stream with a fetcher.
pub(super) struct ScoresFetcher<T> {
struct Fetcher<T: Fetch> {
fetcher: T,
client: OsuClient,
scores: Vec<Score>,
items: Vec<T::Item>,
more_exists: bool,
}
impl<T: FetchScores> ScoresFetcher<T> {
impl<T: Fetch> Fetcher<T> {
/// Create a new Scores stream.
pub async fn new(client: OsuClient, fetcher: T) -> Result<Self> {
let mut s = Self {
fetcher,
client,
scores: Vec::new(),
items: Vec::new(),
more_exists: true,
};
// fetch the first page immediately.
@ -138,7 +150,7 @@ impl<T: FetchScores> ScoresFetcher<T> {
}
}
impl<T: FetchScores> Scores for ScoresFetcher<T> {
impl<T: Fetch> LazyBuffer<T::Item> for Fetcher<T> {
/// Total length of the pages.
fn length_fetched(&self) -> Size {
let count = self.len();
@ -150,62 +162,65 @@ impl<T: FetchScores> Scores for ScoresFetcher<T> {
}
fn is_empty(&self) -> bool {
self.scores.is_empty()
self.items.is_empty()
}
/// Get the index-th score.
async fn get(&mut self, index: usize) -> Result<Option<&Score>> {
async fn get(&mut self, index: usize) -> Result<Option<&T::Item>> {
Ok(self.get_range(index..(index + 1)).await?.first())
}
/// Get all scores.
async fn get_all(mut self) -> Result<Vec<Score>> {
async fn get_all(mut self) -> Result<Vec<T::Item>> {
let _ = self.get_range(0..usize::MAX).await?;
Ok(self.scores)
Ok(self.items)
}
/// Get the scores between the given range.
async fn get_range(&mut self, range: Range<usize>) -> Result<&[Score]> {
async fn get_range(&mut self, range: Range<usize>) -> Result<&[T::Item]> {
while self.len() < range.end {
if !self.fetch_next_page().await? {
break;
}
}
Ok(&self.scores[fit_range_to_len(self.len(), range)])
Ok(&self.items[fit_range_to_len(self.len(), range)])
}
async fn find<F: FnMut(&Score) -> bool + Send>(&mut self, mut f: F) -> Result<Option<&Score>> {
async fn find<F: FnMut(&T::Item) -> bool + Send>(
&mut self,
mut f: F,
) -> Result<Option<&T::Item>> {
let mut from = 0usize;
let index = loop {
if from == self.len() && !self.fetch_next_page().await? {
break None;
}
if f(&self.scores[from]) {
if f(&self.items[from]) {
break Some(from);
}
from += 1;
};
Ok(index.map(|v| &self.scores[v]))
Ok(index.map(|v| &self.items[v]))
}
}
impl<T: FetchScores> ScoresFetcher<T> {
impl<T: Fetch> Fetcher<T> {
async fn fetch_next_page(&mut self) -> Result<bool> {
if !self.more_exists {
return Ok(false);
}
let offset = self.len();
let scores = self.fetcher.fetch_scores(&self.client, offset).await?;
if scores.len() < T::SCORES_PER_PAGE {
let scores = self.fetcher.fetch(&self.client, offset).await?;
if scores.len() < T::ITEMS_PER_PAGE {
self.more_exists = false;
}
if scores.is_empty() {
return Ok(false);
}
self.scores.extend(scores);
self.items.extend(scores);
Ok(true)
}
fn len(&self) -> usize {
self.scores.len()
self.items.len()
}
}