I'm new to Rust and trying to build an async Server.
The server should receive UDP packets, process them (which will include more network communication), and respond the result to the client.
I had some trouble finding a solution for the "Clean Shut-Down". I'm not convinced though, about the select. The whole main loop logic is nested two extra levels, just to make it cancelled. I wonder if there are cleaner solutions.
I also had to add a sleep, to the end, to make the last log statement appear. I could not find a solution to flush all pending log messages otherwise.
When spawning a task I copy the receive buffer, to have an owned copy for the async task. Probably better would have been, the recv_from() directly stores the content into the buffer thats ownership is transferred to the task. And the next iteration of the loop is using a new buffer. But I could figure out how, that would look like in Rust.
So I would be happy to receive some feed back to the below code. Thanks.
# Cargo.toml
[package]
name = "echo-server"
version = "0.1.0"
edition = "2024"
[dependencies]
log = { version = "0.4.27", features = ["kv_serde", "serde"] }
serde = "1.0.219"
structured-logger = "1.0.4"
tokio = { version = "1.47.1", features = ["io-std", "io-util", "macros", "net", "rt-multi-thread", "signal", "time"] }
tokio-util = { version = "0.7.16", features = ["rt"] }
// src/main.rs
use core::net::SocketAddr;
use serde::Serialize;
use std::env;
use std::error::Error;
use std::sync::Arc;
use structured_logger::{Builder, async_json::new_writer};
use tokio::net::UdpSocket;
use tokio::time::{Duration, sleep};
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
Builder::with_level("trace")
.with_default_writer(new_writer(tokio::io::stdout()))
.init();
log::info!(
app:serde = AppInfo { name: env!("CARGO_PKG_NAME"), version: env!("CARGO_PKG_VERSION") };
"Starting"
);
let cancel_token = CancellationToken::new();
let tracker = TaskTracker::new();
let server = EchoServer::new(cancel_token.clone(), tracker.clone());
tokio::join!(
async {
_ = tokio::signal::ctrl_c().await;
cancel_token.cancel();
},
async {
cancel_token.cancelled().await;
tracker.close();
},
async {
server.serve().await;
log::info!("Server shut down!");
},
tracker.wait()
);
// Wait for async log messages to be flushed
sleep(Duration::from_millis(10)).await;
// Not working: log::logger().flush();
Ok(())
}
#[derive(Serialize)]
struct AppInfo<'a> {
name: &'a str,
version: &'a str,
}
struct EchoServer {
cancel_token: CancellationToken,
tracker: TaskTracker,
}
impl EchoServer {
pub fn new(cancel_token: CancellationToken, tracker: TaskTracker) -> Self {
Self {
cancel_token,
tracker,
}
}
async fn serve(&self) {
log::info!("Listening on port 8888");
let Ok(sock) = UdpSocket::bind("[::]:8888").await else {
self.cancel_token.cancel();
return;
};
let sock = Arc::new(sock);
loop {
let mut buf = [0u8; 512];
tokio::select! {
_ = self.cancel_token.cancelled() => break,
resp = sock.recv_from(&mut buf) => {
// Main-Loop nested 2 levels due to select
let Ok((len, src)) = resp else {
log::error!("Failed to recv");
continue
};
let packet = buf[..len].to_vec();
let resp_sock = sock.clone();
self.tracker.spawn(async move {
Self::handle_request(src, &packet, resp_sock).await;
});
}
};
}
}
async fn handle_request(src: SocketAddr, packet: &[u8], sock: Arc<UdpSocket>) {
log::trace!("Handle Request {}", src);
// Do some heavy work here.
let Ok(_) = sock.send_to(packet, src).await else {
log::error!("Failed to send");
return;
};
}
}