Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

teal-fm/cadet

Repository files navigation

Rocketman

A modular(ish) jetstream consumer. Backed by Tungstenite.

Installation

[dependencies]
rocketman = "0.2.3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

Usage

use rocketman::{
 connection::JetstreamConnection,
 handler,
 ingestion::LexiconIngestor,
 options::JetstreamOptions,
 types::event::{Event},
};
use serde_json::Value;
use std::{collections::HashMap, sync::{Arc, Mutex}};
use anyhow::Result;
use async_trait::async_trait;
#[tokio::main]
async fn main() {
 // init the builder
 let opts = JetstreamOptions::builder()
 // your EXACT nsids
 .wanted_collections(vec!["com.example.cool.nsid".to_string()])
 .build();
 // create the jetstream connector
 let jetstream = JetstreamConnection::new(opts);
 // create your ingestors
 let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
 ingestors.insert(
 // your EXACT nsid
 "com.example.cool.nsid".to_string(),
 Box::new(MyCoolIngestor),
 );
 // tracks the last message we've processed
 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
 // get channels
 let msg_rx = jetstream.get_msg_rx();
 let reconnect_tx = jetstream.get_reconnect_tx();
 // spawn a task to process messages from the queue.
 // this is a simple implementation, you can use a more complex one based on needs.
 let c_cursor = cursor.clone();
 tokio::spawn(async move {
 while let Ok(message) = msg_rx.recv_async().await {
 if let Err(e) =
 handler::handle_message(message, &ingestors, reconnect_tx.clone(), c_cursor.clone())
 .await
 {
 eprintln!("Error processing message: {}", e);
 };
 }
 });
 // connect to jetstream
 // retries internally, but may fail if there is an extreme error.
 if let Err(e) = jetstream.connect(cursor.clone()).await {
 eprintln!("Failed to connect to Jetstream: {}", e);
 std::process::exit(1);
 }
}
pub struct MyCoolIngestor;
/// A cool ingestor implementation. Will just print the message. Does not do verification.
#[async_trait]
impl LexiconIngestor for MyCoolIngestor {
 async fn ingest(&self, message: Event<Value>) -> Result<()> {
 println!("{:?}", message);
 // Process message for default lexicon.
 Ok(())
 }
}

About

jetstream listener and ingester for teal dot fm

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

Contributors 5

Languages

AltStyle によって変換されたページ (->オリジナル) /