roux
is a Rust library that wraps the Reddit API. Neither Reddit nor roux
provides a stream-like interface for obtaining the latest submissions automatically, so that's what I've implemented here.
I'm learning Rust as a hobby (I'm a Python developer by trade) and this is my first "serious" Rust project. Please feel free to point out anything that sticks out to you. The following points are particularly interesting to me:
Am I using references and owned values appropriately (e.g. no unnecessary cloning)?
Is the interface provided by my
lib.rs
fitting for this use case (async streaming of items) and idiomatic?As a Rust newbie, the types in the
where
declarations are hard to understand. Would it be useful to introduce some aliases?Is my approach to error handling OK?
lib.rs
:
#![warn(missing_docs)]
/*!
Streaming API for `roux`
Reddit's API does not provide "firehose"-style streaming of new posts and
comments. Instead, the endpoint for retrieving the latest posts has to be
polled regularly. This crate automates that task and provides a stream
for a subreddit's posts (submissions).
See [`stream_subreddit_submissions`] for details.
# Logging
This module uses the logging infrastructure provided by the [`log`] crate.
*/
use futures::{Sink, SinkExt};
use log::{debug, warn};
use roux::subreddit::responses::SubmissionsData;
use roux::{util::RouxError, Subreddit};
use std::collections::HashSet;
use std::marker::Unpin;
use tokio::time::{sleep, Duration};
use tokio_retry::Retry;
/// Error that may happen when streaming submissions
#[derive(Debug)]
pub enum SubmissionStreamError<S>
where
S: Sink<SubmissionsData> + Unpin,
{
/// An issue with getting the data from Reddit
Roux(RouxError),
/// An issue with sending the data through the sink
Sink(S::Error),
}
/**
Stream new submissions in a subreddit
The subreddit is polled regularly for new submissions, and each previously
unseen submission is sent into the sink.
`sleep_time` controls the interval between calls to the Reddit API, and depends
on how much traffic the subreddit has. Each call fetches the 100 latest items
(the maximum number allowed by Reddit). A warning is logged if none of those
items has been seen in the previous call: this indicates a potential miss of new
content and suggests that a smaller `sleep_time` should be chosen.
`retry_strategy` controls how to deal with errors that occur while fetching
content from Reddit. See [`tokio_retry::strategy`].
*/
pub async fn stream_subreddit_submissions<S, R, I>(
subreddit: &Subreddit,
mut sink: S,
sleep_time: Duration,
retry_strategy: &R,
) -> Result<(), SubmissionStreamError<S>>
where
S: Sink<SubmissionsData> + Unpin,
R: IntoIterator<IntoIter = I, Item = Duration> + Clone,
I: Iterator<Item = Duration>,
{
// How many submissions to fetch per request
const LIMIT: u32 = 100;
let mut seen_ids: HashSet<String> = HashSet::new();
loop {
let latest_submissions =
Retry::spawn(retry_strategy.clone(), || subreddit.latest(LIMIT, None))
.await
.map_err(SubmissionStreamError::Roux)?
.data
.children
.into_iter()
.map(|thing| thing.data);
let mut latest_ids: HashSet<String> = HashSet::new();
let mut num_new = 0;
for submission in latest_submissions {
latest_ids.insert(submission.id.clone());
if !seen_ids.contains(&submission.id) {
num_new += 1;
sink.send(submission)
.await
.map_err(SubmissionStreamError::Sink)?
}
}
debug!(
"Got {} new submissions for r/{} (out of {})",
num_new, subreddit.name, LIMIT
);
if num_new == LIMIT && !seen_ids.is_empty() {
warn!(
"All received submissions for r/{} were new, try a shorter sleep_time",
subreddit.name
);
}
seen_ids = latest_ids;
sleep(sleep_time).await;
}
}
main.rs
:
use futures::{channel::mpsc, Stream, StreamExt};
use roux::{subreddit::responses::SubmissionsData, Subreddit};
use tokio;
use tokio::time::Duration;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use subreddit_dumper;
async fn submission_reader<S>(stream: &mut S)
where
S: Stream<Item = SubmissionsData> + Unpin,
{
while let Some(submission) = stream.next().await {
println!(
"New submission in r/{} by {}",
submission.subreddit, submission.author
);
}
}
#[tokio::main]
async fn main() {
// Initialize logging
stderrlog::new()
.module(module_path!())
.verbosity(3)
.init()
.unwrap();
let subreddit = Subreddit::new("AskReddit");
let (mut submission_sender, mut submission_receiver) = mpsc::unbounded();
let retry_strategy = ExponentialBackoff::from_millis(100)
.map(jitter) // add jitter to delays
.take(3); // limit to 3 retries
let (submission_res, _) = tokio::join!(
subreddit_dumper::stream_subreddit_submissions(
&subreddit,
&mut submission_sender,
Duration::from_secs(60),
&retry_strategy,
),
submission_reader(&mut submission_receiver),
);
submission_res.unwrap();
}
1 Answer 1
Your code is good for the most parts, however there are some improvements that can be made:
The final line in your main
method submission_res.unwrap()
should be changed to handle the errors and maybe log them.
Personally, I would change the structure of the library. Instead of calling the async
method stream_subreddit_submissions
with all the different configuration parameters, I would create a struct called SubmissionsStream
that has two methods:
pub fn new(
subreddit: Subreddit,
sink: S,
sleep_time: Duration,
retry_strategy: R,
) -> SubmissionsStream
For creating the SumbissionsStream
with all the configuration.
pub async fn run(self) -> Result<(), SubmissionStreamError<S::Error>>
For running the stream.
I would also suggest to pull as much of the configuration creation inside of the new()
method. For example you can change new()
to create the mpsc
channel inside and return the receiver, rather than passing the sender to it. However this would limit you from using a specific mpsc
channel so I would leave it only as a suggestion and not something that you necessarily have to do.
SubmissionStreamError
should implement the standard traits Display
and Error
.
The logic for the seen ids can be optimised.
When you are checking if an id has been seen or not in the for
loop, and you find an id that has been seen, you can break from the loop because you would have seen all of the ids after it (iff roux
returns the latest posts ordered by time, which seems to be the case from testing it).
This means that you don't need to have two HashSet
s and this:
let mut latest_ids: HashSet<String> = HashSet::new();
let mut num_new = 0;
for submission in latest_submissions {
latest_ids.insert(submission.id.clone());
if !seen_ids.contains(&submission.id) {
num_new += 1;
sink.send(submission)
.await
.map_err(SubmissionStreamError::Sink)?
}
}
can be simplified:
let mut num_new = 0;
for submission in latest_submissions {
if !seen_ids.contains(&submission.id) {
seen_ids.insert(submission.id.clone());
num_new += 1;
sink.send(submission)
.await
.map_err(SubmissionStreamError::Sink)?
} else {
//If you reach a seen submission you
// will have seen all other submissions after it.
break;
}
}
Final Code:
lib.rs
use futures::{Sink, SinkExt};
use log::{debug, warn};
use roux::subreddit::responses::SubmissionsData;
use roux::{util::RouxError, Subreddit};
use std::collections::HashSet;
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
use std::marker::Unpin;
use tokio::time::{sleep, Duration};
use tokio_retry::Retry;
/// Error that may happen when streaming submissions
#[derive(Debug)]
pub enum SubmissionStreamError<SinkErr> {
/// An issue with getting the data from Reddit
Roux(RouxError),
/// An issue with sending the data through the sink
Sink(SinkErr),
}
impl<S: Debug + Display> Display for SubmissionStreamError<S> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
SubmissionStreamError::Roux(roux_err) => {
write!(f, "{}", roux_err)
}
SubmissionStreamError::Sink(sink_err) => {
write!(f, "{}", sink_err)
}
}
}
}
impl<S: Debug + Display> Error for SubmissionStreamError<S> {}
pub struct SubmissionsStream<S, R> {
subreddit: Subreddit,
sink: S,
sleep_time: Duration,
retry_strategy: R,
}
impl<S, R, I> SubmissionsStream<S, R>
where
S: Sink<SubmissionsData> + Unpin,
R: IntoIterator<IntoIter = I, Item = Duration> + Clone,
I: Iterator<Item = Duration>,
{
pub fn new(
subreddit: Subreddit,
sink: S,
sleep_time: Duration,
retry_strategy: R,
) -> SubmissionsStream<S, R> {
SubmissionsStream {
subreddit,
sink,
sleep_time,
retry_strategy,
}
}
pub async fn run(self) -> Result<(), SubmissionStreamError<S::Error>> {
let SubmissionsStream {
subreddit,
mut sink,
sleep_time,
retry_strategy,
} = self;
const LIMIT: u32 = 100;
let mut seen_ids: HashSet<String> = HashSet::new();
loop {
let latest_submissions =
Retry::spawn(retry_strategy.clone(), || subreddit.latest(LIMIT, None))
.await
.map_err(SubmissionStreamError::Roux)?
.data
.children
.into_iter()
.map(|thing| thing.data);
let mut num_new = 0;
for submission in latest_submissions {
if !seen_ids.contains(&submission.id) {
seen_ids.insert(submission.id.clone());
num_new += 1;
sink.send(submission)
.await
.map_err(SubmissionStreamError::Sink)?
} else {
//If you reach a seen submission you
// will have seen all other submissions after it.
break;
}
}
debug!(
"Got {} new submissions for r/{} (out of {})",
num_new, subreddit.name, LIMIT
);
if num_new == LIMIT && !seen_ids.is_empty() {
warn!(
"All received submissions for r/{} were new, try a shorter sleep_time",
subreddit.name
);
}
sleep(sleep_time).await;
}
}
}
main.rs
use futures::{channel::mpsc, Stream, StreamExt};
use log::debug;
use roux::{subreddit::responses::SubmissionsData, Subreddit};
use tokio::time::Duration;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
async fn submission_reader<S>(stream: &mut S)
where
S: Stream<Item = SubmissionsData> + Unpin,
{
while let Some(submission) = stream.next().await {
println!(
"New submission in r/{} by {}",
submission.subreddit, submission.author
);
}
}
#[tokio::main]
async fn main() {
// Initialize logging
stderrlog::new()
.module(module_path!())
.verbosity(3)
.init()
.unwrap();
let subreddit = Subreddit::new("AskReddit");
let (submission_sender, mut submission_receiver) = mpsc::unbounded();
let retry_strategy = ExponentialBackoff::from_millis(100)
.map(jitter) // add jitter to delays
.take(3); // limit to 3 retries
let submission_stream = subreddit_dumper::SubmissionsStream::new(
subreddit,
submission_sender,
Duration::from_secs(60),
retry_strategy,
);
let (submission_res, _) = tokio::join!(
submission_stream.run(),
submission_reader(&mut submission_receiver),
);
if let Err(stream_err) = submission_res {
debug!("Error: {}", stream_err);
}
}
-
\$\begingroup\$ Thanks! One thing regarding your suggested improvement of checking for seen IDs: in your version,
seen_ids
seems to grow indefinitely. That's something I'd like to avoid, since one intended use case are long-running Reddit bots. I do like the idea of wrapping the functionality in a struct! \$\endgroup\$Florian Brucker– Florian Brucker2021年06月17日 17:06:35 +00:00Commented Jun 17, 2021 at 17:06