Use streams and such to have 200 results

This commit is contained in:
Natsu Kagami 2025-05-08 12:29:40 -04:00
parent 8fdd576eb9
commit 0661199420
Signed by: nki
GPG key ID: 55A032EB38B49ADB
8 changed files with 150 additions and 86 deletions

1
.envrc
View file

@ -1 +1,2 @@
dotenv
use flake use flake

2
Cargo.lock generated
View file

@ -746,6 +746,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
"futures-executor",
"futures-io", "futures-io",
"futures-sink", "futures-sink",
"futures-task", "futures-task",
@ -3905,6 +3906,7 @@ dependencies = [
"bitflags 1.3.2", "bitflags 1.3.2",
"chrono", "chrono",
"dashmap 5.5.3", "dashmap 5.5.3",
"futures",
"futures-util", "futures-util",
"lazy_static", "lazy_static",
"poise", "poise",

View file

@ -26,7 +26,8 @@ serenity = "0.12"
poise = { git = "https://github.com/serenity-rs/poise", branch = "current" } poise = { git = "https://github.com/serenity-rs/poise", branch = "current" }
zip = "0.6.2" zip = "0.6.2"
rand = "0.8" rand = "0.8"
futures-util = "0.3.30" futures = "0.3"
futures-util = "0.3"
thiserror = "2" thiserror = "2"
youmubot-db = { path = "../youmubot-db" } youmubot-db = { path = "../youmubot-db" }

View file

@ -212,7 +212,8 @@ impl Announcer {
}; };
let top_scores = env let top_scores = env
.client .client
.user_best(user_id.clone(), |f| f.mode(mode).limit(100)); .user_best(user_id.clone(), |f| f.mode(mode))
.try_collect::<Vec<_>>();
let (user, top_scores) = try_join!(user, top_scores)?; let (user, top_scores) = try_join!(user, top_scores)?;
let mut user = user.unwrap(); let mut user = user.unwrap();
// if top scores exist, user would too // if top scores exist, user would too

View file

@ -4,6 +4,7 @@ use super::*;
use cache::save_beatmap; use cache::save_beatmap;
use display::display_beatmapset; use display::display_beatmapset;
use embeds::ScoreEmbedBuilder; use embeds::ScoreEmbedBuilder;
use futures::TryStream;
use link_parser::EmbedType; use link_parser::EmbedType;
use poise::{ChoiceParameter, CreateReply}; use poise::{ChoiceParameter, CreateReply};
use serenity::all::{CreateAttachment, User}; use serenity::all::{CreateAttachment, User};
@ -40,7 +41,7 @@ async fn top<U: HasOsuEnv>(
ctx: CmdContext<'_, U>, ctx: CmdContext<'_, U>,
#[description = "Index of the score"] #[description = "Index of the score"]
#[min = 1] #[min = 1]
#[max = 100] #[max = 200] // SCORE_COUNT_LIMIT
index: Option<u8>, index: Option<u8>,
#[description = "Score listing style"] style: Option<ScoreListStyle>, #[description = "Score listing style"] style: Option<ScoreListStyle>,
#[description = "Game mode"] mode: Option<Mode>, #[description = "Game mode"] mode: Option<Mode>,
@ -61,11 +62,8 @@ async fn top<U: HasOsuEnv>(
ctx.defer().await?; ctx.defer().await?;
let osu_client = &env.client; let osu_client = &env.client;
let mut plays = osu_client let mode = args.mode;
.user_best(UserID::ID(args.user.id), |f| f.mode(args.mode).limit(100)) let plays = osu_client.user_best(UserID::ID(args.user.id), |f| f.mode(mode));
.await?;
plays.sort_unstable_by(|a, b| b.pp.partial_cmp(&a.pp).unwrap());
handle_listing(ctx, plays, args, |nth, b| b.top_record(nth), "top").await handle_listing(ctx, plays, args, |nth, b| b.top_record(nth), "top").await
} }
@ -135,11 +133,10 @@ async fn recent<U: HasOsuEnv>(
ctx.defer().await?; ctx.defer().await?;
let osu_client = &env.client; let osu_client = &env.client;
let plays = osu_client let mode = args.mode;
.user_recent(UserID::ID(args.user.id), |f| { let plays = osu_client.user_recent(UserID::ID(args.user.id), |f| {
f.mode(args.mode).include_fails(include_fails).limit(50) f.mode(mode).include_fails(include_fails).limit(50)
}) });
.await?;
handle_listing(ctx, plays, args, |_, b| b, "recent").await handle_listing(ctx, plays, args, |_, b| b, "recent").await
} }
@ -168,9 +165,8 @@ async fn pinned<U: HasOsuEnv>(
ctx.defer().await?; ctx.defer().await?;
let osu_client = &env.client; let osu_client = &env.client;
let plays = osu_client let mode = args.mode;
.user_pins(UserID::ID(args.user.id), |f| f.mode(args.mode).limit(50)) let plays = osu_client.user_pins(UserID::ID(args.user.id), |f| f.mode(mode));
.await?;
handle_listing(ctx, plays, args, |_, b| b, "pinned").await handle_listing(ctx, plays, args, |_, b| b, "pinned").await
} }
@ -254,7 +250,7 @@ pub async fn forcesave<U: HasOsuEnv>(
async fn handle_listing<U: HasOsuEnv>( async fn handle_listing<U: HasOsuEnv>(
ctx: CmdContext<'_, U>, ctx: CmdContext<'_, U>,
plays: Vec<Score>, plays: impl TryStream<Ok = Score, Error = Error>,
listing_args: ListingArgs, listing_args: ListingArgs,
transform: impl for<'a> Fn(u8, ScoreEmbedBuilder<'a>) -> ScoreEmbedBuilder<'a>, transform: impl for<'a> Fn(u8, ScoreEmbedBuilder<'a>) -> ScoreEmbedBuilder<'a>,
listing_kind: &'static str, listing_kind: &'static str,
@ -269,8 +265,14 @@ async fn handle_listing<U: HasOsuEnv>(
match nth { match nth {
Nth::Nth(nth) => { Nth::Nth(nth) => {
let Some(play) = plays.get(nth as usize) else { let play = std::pin::pin!(plays.into_stream())
Err(Error::msg("no such play"))? .skip(nth as usize)
.next()
.await;
let play = if let Some(play) = play {
play?
} else {
return Err(Error::msg("no such play"))?;
}; };
let beatmap = env.beatmaps.get_beatmap(play.beatmap_id, mode).await?; let beatmap = env.beatmaps.get_beatmap(play.beatmap_id, mode).await?;
@ -311,7 +313,7 @@ async fn handle_listing<U: HasOsuEnv>(
.await?; .await?;
style style
.display_scores( .display_scores(
plays, plays.try_collect::<Vec<_>>().await?,
ctx.clone().serenity_context(), ctx.clone().serenity_context(),
ctx.guild_id(), ctx.guild_id(),
(reply, ctx), (reply, ctx),

View file

@ -35,7 +35,7 @@ use crate::{
}, },
models::{Beatmap, Mode, Mods, Score, User}, models::{Beatmap, Mode, Mods, Score, User},
mods::UnparsedMods, mods::UnparsedMods,
request::{BeatmapRequestKind, UserID}, request::{BeatmapRequestKind, UserID, SCORE_COUNT_LIMIT},
OsuClient as OsuHttpClient, UserHeader, OsuClient as OsuHttpClient, UserHeader,
}; };
@ -304,6 +304,7 @@ pub(crate) async fn find_save_requirements(
] { ] {
let scores = client let scores = client
.user_best(UserID::ID(u.id), |f| f.mode(*mode)) .user_best(UserID::ID(u.id), |f| f.mode(*mode))
.try_collect::<Vec<_>>()
.await?; .await?;
if let Some(v) = scores.into_iter().choose(&mut rand::thread_rng()) { if let Some(v) = scores.into_iter().choose(&mut rand::thread_rng()) {
return Ok(Some((v, *mode))); return Ok(Some((v, *mode)));
@ -350,12 +351,10 @@ pub(crate) async fn handle_save_respond(
) -> Result<()> { ) -> Result<()> {
let osu_client = &env.client; let osu_client = &env.client;
async fn check(client: &OsuHttpClient, u: &User, mode: Mode, map_id: u64) -> Result<bool> { async fn check(client: &OsuHttpClient, u: &User, mode: Mode, map_id: u64) -> Result<bool> {
Ok(client client
.user_recent(UserID::ID(u.id), |f| f.mode(mode).limit(1)) .user_recent(UserID::ID(u.id), |f| f.mode(mode).limit(1))
.await? .try_any(|s| future::ready(s.beatmap_id == map_id))
.into_iter() .await
.take(1)
.any(|s| s.beatmap_id == map_id))
} }
let msg_id = reply.get_message().await?.id; let msg_id = reply.get_message().await?.id;
let recv = InteractionCollector::create(&ctx, msg_id).await?; let recv = InteractionCollector::create(&ctx, msg_id).await?;
@ -501,7 +500,8 @@ impl UserExtras {
pub async fn from_user(env: &OsuEnv, user: &User, mode: Mode) -> Result<Self> { pub async fn from_user(env: &OsuEnv, user: &User, mode: Mode) -> Result<Self> {
let scores = env let scores = env
.client .client
.user_best(UserID::ID(user.id), |f| f.mode(mode).limit(100)) .user_best(UserID::ID(user.id), |f| f.mode(mode))
.try_collect::<Vec<_>>()
.await .await
.pls_ok() .pls_ok()
.unwrap_or_else(std::vec::Vec::new); .unwrap_or_else(std::vec::Vec::new);
@ -589,7 +589,7 @@ impl ListingArgs {
sender: serenity::all::UserId, sender: serenity::all::UserId,
) -> Result<Self> { ) -> Result<Self> {
let nth = index let nth = index
.filter(|&v| 1 <= v && v <= 100) .filter(|&v| 1 <= v && v <= SCORE_COUNT_LIMIT as u8)
.map(|v| v - 1) .map(|v| v - 1)
.map(Nth::Nth) .map(Nth::Nth)
.unwrap_or_default(); .unwrap_or_default();
@ -678,9 +678,7 @@ pub async fn recent(ctx: &Context, msg: &Message, mut args: Args) -> CommandResu
} = ListingArgs::parse(&env, msg, &mut args, ScoreListStyle::Table).await?; } = ListingArgs::parse(&env, msg, &mut args, ScoreListStyle::Table).await?;
let osu_client = &env.client; let osu_client = &env.client;
let plays = osu_client let plays = osu_client.user_recent(UserID::ID(user.id), |f| f.mode(mode));
.user_recent(UserID::ID(user.id), |f| f.mode(mode).limit(50))
.await?;
match nth { match nth {
Nth::All => { Nth::All => {
let reply = msg let reply = msg
@ -690,18 +688,24 @@ pub async fn recent(ctx: &Context, msg: &Message, mut args: Args) -> CommandResu
) )
.await?; .await?;
style style
.display_scores(plays, ctx, reply.guild_id, (reply, ctx)) .display_scores(
plays.try_collect::<Vec<_>>().await?,
ctx,
reply.guild_id,
(reply, ctx),
)
.await?; .await?;
} }
Nth::Nth(nth) => { Nth::Nth(nth) => {
let Some(play) = plays.get(nth as usize) else { let plays = std::pin::pin!(plays.into_stream());
Err(Error::msg("No such play"))? let (play, rest) = plays.skip(nth as usize).into_future().await;
}; let play = play.ok_or(Error::msg("No such play"))??;
let attempts = plays let attempts = rest
.iter() .try_take_while(|p| {
.skip(nth as usize) future::ok(p.beatmap_id == play.beatmap_id && p.mods == play.mods)
.take_while(|p| p.beatmap_id == play.beatmap_id && p.mods == play.mods) })
.count(); .count()
.await;
let beatmap = env.beatmaps.get_beatmap(play.beatmap_id, mode).await?; let beatmap = env.beatmaps.get_beatmap(play.beatmap_id, mode).await?;
let content = env.oppai.get_beatmap(beatmap.beatmap_id).await?; let content = env.oppai.get_beatmap(beatmap.beatmap_id).await?;
let beatmap_mode = BeatmapWithMode(beatmap, Some(mode)); let beatmap_mode = BeatmapWithMode(beatmap, Some(mode));
@ -716,7 +720,7 @@ pub async fn recent(ctx: &Context, msg: &Message, mut args: Args) -> CommandResu
user.mention() user.mention()
)) ))
.embed( .embed(
score_embed(play, &beatmap_mode, &content, user) score_embed(&play, &beatmap_mode, &content, user)
.footer(format!("Attempt #{}", attempts)) .footer(format!("Attempt #{}", attempts))
.build(), .build(),
) )
@ -751,9 +755,7 @@ pub async fn pins(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult
let osu_client = &env.client; let osu_client = &env.client;
let plays = osu_client let plays = osu_client.user_pins(UserID::ID(user.id), |f| f.mode(mode));
.user_pins(UserID::ID(user.id), |f| f.mode(mode).limit(50))
.await?;
match nth { match nth {
Nth::All => { Nth::All => {
let reply = msg let reply = msg
@ -763,13 +765,20 @@ pub async fn pins(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult
) )
.await?; .await?;
style style
.display_scores(plays, ctx, reply.guild_id, (reply, ctx)) .display_scores(
plays.try_collect::<Vec<_>>().await?,
ctx,
reply.guild_id,
(reply, ctx),
)
.await?; .await?;
} }
Nth::Nth(nth) => { Nth::Nth(nth) => {
let Some(play) = plays.get(nth as usize) else { let play = std::pin::pin!(plays.into_stream())
Err(Error::msg("No such play"))? .skip(nth as usize)
}; .next()
.await
.ok_or(Error::msg("No such play"))??;
let beatmap = env.beatmaps.get_beatmap(play.beatmap_id, mode).await?; let beatmap = env.beatmaps.get_beatmap(play.beatmap_id, mode).await?;
let content = env.oppai.get_beatmap(beatmap.beatmap_id).await?; let content = env.oppai.get_beatmap(beatmap.beatmap_id).await?;
let beatmap_mode = BeatmapWithMode(beatmap, Some(mode)); let beatmap_mode = BeatmapWithMode(beatmap, Some(mode));
@ -779,7 +788,7 @@ pub async fn pins(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult
&ctx, &ctx,
CreateMessage::new() CreateMessage::new()
.content("Here is the play that you requested".to_string()) .content("Here is the play that you requested".to_string())
.embed(score_embed(play, &beatmap_mode, &content, user).build()) .embed(score_embed(&play, &beatmap_mode, &content, user).build())
.components(vec![score_components(msg.guild_id)]) .components(vec![score_components(msg.guild_id)])
.reference_message(msg), .reference_message(msg),
) )
@ -1086,18 +1095,15 @@ pub async fn top(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult
} = ListingArgs::parse(&env, msg, &mut args, ScoreListStyle::default()).await?; } = ListingArgs::parse(&env, msg, &mut args, ScoreListStyle::default()).await?;
let osu_client = &env.client; let osu_client = &env.client;
let mut plays = osu_client let plays = osu_client.user_best(UserID::ID(user.id), |f| f.mode(mode));
.user_best(UserID::ID(user.id), |f| f.mode(mode).limit(100))
.await?;
plays.sort_unstable_by(|a, b| b.pp.partial_cmp(&a.pp).unwrap());
let plays = plays;
match nth { match nth {
Nth::Nth(nth) => { Nth::Nth(nth) => {
let Some(play) = plays.get(nth as usize) else { let play = std::pin::pin!(plays.into_stream())
Err(Error::msg("no such play"))? .skip(nth as usize)
}; .next()
.await
.ok_or(Error::msg("No such play"))??;
let beatmap = env.beatmaps.get_beatmap(play.beatmap_id, mode).await?; let beatmap = env.beatmaps.get_beatmap(play.beatmap_id, mode).await?;
let content = env.oppai.get_beatmap(beatmap.beatmap_id).await?; let content = env.oppai.get_beatmap(beatmap.beatmap_id).await?;
@ -1131,7 +1137,12 @@ pub async fn top(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult
) )
.await?; .await?;
style style
.display_scores(plays, ctx, msg.guild_id, (reply, ctx)) .display_scores(
plays.try_collect::<Vec<_>>().await?,
ctx,
msg.guild_id,
(reply, ctx),
)
.await?; .await?;
} }
} }
@ -1193,11 +1204,6 @@ fn scales() -> &'static [f64] {
SCALES.get_or_init(|| { SCALES.get_or_init(|| {
(0..256) (0..256)
.map(|r| SCALING_FACTOR.powi(r)) .map(|r| SCALING_FACTOR.powi(r))
// .scan(1.0, |a, _| {
// let old = *a;
// *a *= SCALING_FACTOR;
// Some(old)
// })
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_boxed_slice() .into_boxed_slice()
}) })
@ -1244,6 +1250,7 @@ pub(in crate::discord) async fn calculate_weighted_map_age(
.collect::<FuturesOrdered<_>>() .collect::<FuturesOrdered<_>>()
.try_collect::<Vec<_>>() .try_collect::<Vec<_>>()
.await?; .await?;
println!("Calculating score from {} scores", scores.len());
Ok((scores Ok((scores
.iter() .iter()
.zip(scales().iter()) .zip(scales().iter())

View file

@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::convert::TryInto; use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use futures::TryStream;
use futures_util::lock::Mutex; use futures_util::lock::Mutex;
use models::*; use models::*;
use request::builders::*; use request::builders::*;
@ -92,39 +93,39 @@ impl OsuClient {
r.build(self).await r.build(self).await
} }
pub async fn user_best( pub fn user_best(
&self, &self,
user: UserID, user: UserID,
f: impl FnOnce(&mut UserScoreRequestBuilder) -> &mut UserScoreRequestBuilder, f: impl FnOnce(&mut UserScoreRequestBuilder) -> &mut UserScoreRequestBuilder,
) -> Result<Vec<Score>, Error> { ) -> impl TryStream<Ok = Score, Error = Error> {
self.user_scores(UserScoreType::Best, user, f).await self.user_scores(UserScoreType::Best, user, f)
} }
pub async fn user_recent( pub fn user_recent(
&self, &self,
user: UserID, user: UserID,
f: impl FnOnce(&mut UserScoreRequestBuilder) -> &mut UserScoreRequestBuilder, f: impl FnOnce(&mut UserScoreRequestBuilder) -> &mut UserScoreRequestBuilder,
) -> Result<Vec<Score>, Error> { ) -> impl TryStream<Ok = Score, Error = Error> {
self.user_scores(UserScoreType::Recent, user, f).await self.user_scores(UserScoreType::Recent, user, f)
} }
pub async fn user_pins( pub fn user_pins(
&self, &self,
user: UserID, user: UserID,
f: impl FnOnce(&mut UserScoreRequestBuilder) -> &mut UserScoreRequestBuilder, f: impl FnOnce(&mut UserScoreRequestBuilder) -> &mut UserScoreRequestBuilder,
) -> Result<Vec<Score>, Error> { ) -> impl TryStream<Ok = Score, Error = Error> {
self.user_scores(UserScoreType::Pin, user, f).await self.user_scores(UserScoreType::Pin, user, f)
} }
async fn user_scores( fn user_scores(
&self, &self,
u: UserScoreType, u: UserScoreType,
user: UserID, user: UserID,
f: impl FnOnce(&mut UserScoreRequestBuilder) -> &mut UserScoreRequestBuilder, f: impl FnOnce(&mut UserScoreRequestBuilder) -> &mut UserScoreRequestBuilder,
) -> Result<Vec<Score>, Error> { ) -> impl TryStream<Ok = Score, Error = Error> {
let mut r = UserScoreRequestBuilder::new(u, user); let mut r = UserScoreRequestBuilder::new(u, user);
f(&mut r); f(&mut r);
r.build(self).await r.build(self.clone())
} }
pub async fn score(&self, score_id: u64) -> Result<Option<Score>, Error> { pub async fn score(&self, score_id: u64) -> Result<Option<Score>, Error> {

View file

@ -5,6 +5,9 @@ use crate::OsuClient;
use rosu_v2::error::OsuError; use rosu_v2::error::OsuError;
use youmubot_prelude::*; use youmubot_prelude::*;
/// Maximum number of scores returned by the osu! api.
pub const SCORE_COUNT_LIMIT: usize = 200;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum UserID { pub enum UserID {
Username(String), Username(String),
@ -54,6 +57,7 @@ fn handle_not_found<T>(v: Result<T, OsuError>) -> Result<Option<T>, OsuError> {
} }
pub mod builders { pub mod builders {
use futures_util::TryStream;
use rosu_v2::model::mods::GameModsIntermode; use rosu_v2::model::mods::GameModsIntermode;
use crate::models; use crate::models;
@ -196,7 +200,9 @@ pub mod builders {
} }
pub fn limit(&mut self, limit: u8) -> &mut Self { pub fn limit(&mut self, limit: u8) -> &mut Self {
self.limit = Some(limit).filter(|&v| v <= 100).or(self.limit); self.limit = Some(limit)
.filter(|&v| v <= SCORE_COUNT_LIMIT as u8)
.or(self.limit);
self self
} }
@ -237,17 +243,19 @@ pub mod builders {
} }
} }
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum UserScoreType { pub(crate) enum UserScoreType {
Recent, Recent,
Best, Best,
Pin, Pin,
} }
#[derive(Debug, Clone)]
pub struct UserScoreRequestBuilder { pub struct UserScoreRequestBuilder {
score_type: UserScoreType, score_type: UserScoreType,
user: UserID, user: UserID,
mode: Option<Mode>, mode: Option<Mode>,
limit: Option<u8>, limit: Option<usize>,
include_fails: bool, include_fails: bool,
} }
@ -267,8 +275,12 @@ pub mod builders {
self self
} }
pub fn limit(&mut self, limit: u8) -> &mut Self { pub fn limit(&mut self, limit: usize) -> &mut Self {
self.limit = Some(limit).filter(|&v| v <= 100).or(self.limit); self.limit = if limit > SCORE_COUNT_LIMIT {
self.limit
} else {
Some(limit)
};
self self
} }
@ -277,9 +289,30 @@ pub mod builders {
self self
} }
pub(crate) async fn build(self, client: &OsuClient) -> Result<Vec<models::Score>> { async fn with_offset(
self,
offset: Option<usize>,
client: OsuClient,
) -> Result<Option<(Vec<models::Score>, Option<usize>)>> {
const MAXIMUM_LIMIT: usize = 100;
let offset = if let Some(offset) = offset {
offset
} else {
return Ok(None);
};
let count = match self.limit {
Some(limit) => (limit - offset).min(MAXIMUM_LIMIT),
None => MAXIMUM_LIMIT,
};
if count == 0 {
return Ok(None);
}
let scores = handle_not_found({ let scores = handle_not_found({
let mut r = client.rosu.user_scores(self.user); let mut r = client
.rosu
.user_scores(self.user.clone())
.limit(count)
.offset(offset);
r = match self.score_type { r = match self.score_type {
UserScoreType::Recent => r.recent().include_fails(self.include_fails), UserScoreType::Recent => r.recent().include_fails(self.include_fails),
UserScoreType::Best => r.best(), UserScoreType::Best => r.best(),
@ -288,13 +321,29 @@ pub mod builders {
if let Some(mode) = self.mode { if let Some(mode) = self.mode {
r = r.mode(mode.into()); r = r.mode(mode.into());
} }
if let Some(limit) = self.limit {
r = r.limit(limit as usize);
}
r.await r.await
})? })?
.ok_or_else(|| error!("user not found"))?; .ok_or_else(|| error!("user not found"))?;
Ok(scores.into_iter().map(|v| v.into()).collect()) let count = scores.len();
Ok(Some((
scores.into_iter().map(|v| v.into()).collect(),
if count == MAXIMUM_LIMIT {
Some(offset + MAXIMUM_LIMIT)
} else {
None
},
)))
}
pub(crate) fn build(
self,
client: OsuClient,
) -> impl TryStream<Ok = models::Score, Error = Error> {
futures::stream::try_unfold(Some(0), move |off| {
self.clone().with_offset(off, client.clone())
})
.map_ok(|v| futures::stream::iter(v).map(|v| Ok(v) as Result<_>))
.try_flatten()
} }
} }
} }