Summary: Is my server handling every error and corner case possible? If not, what did I miss?
Details
I'm trying to write the most basic socket server I can, so I can learn TCP and the pitfalls of parsing network data. I'm also worried about security (DDOS mitigation) and making sure the server never panics (unless it is an exceptional error, of course). I'm using mio to manage my event loop and sockets.
This is what I have so far:
My imports:
extern crate mio;
extern crate byteorder;
use mio::*;
use mio::tcp::*;
use mio::util::Slab;
use std::net::SocketAddr;
use std::collections::VecDeque;
An enum that represents the status of a client connection. Idle
means the connection is not sending anything. WaitingHeader
means the server received something but doesn't have enough data to parse the header (the header is 2 u8
s, which represent a single u16
). The header is the size of the message. Finally ReceivingData
has the now-parsed header and a Vec
that is filled as the network sends bytes, once this vector is full, the contents are parsed and the client goes back to Idle
#[derive(Debug, PartialEq, Eq)]
enum ClientStatus {
Idle,
WaitingHeader([(bool, u8); 2]),
ReceivingData(u16, Vec<u8>),
}
Structs to hold the state of each connection and the server itself:
#[derive(Debug)]
struct Client {
socket: TcpStream,
status: ClientStatus,
buffer: VecDeque<u8>,
}
impl Client {
fn new(socket: TcpStream) -> Client {
Client {
socket: socket,
status: ClientStatus::Idle,
buffer: VecDeque::new(),
}
}
}
#[derive(Debug)]
struct Server {
socket: TcpListener,
clients: Slab<Client>,
}
The server token and a function to convert [u8; 2]
s into u16
s:
const SERVER_TOKEN: Token = Token(0);
fn convert_header(header: [u8; 2]) -> u16 {
use std::io::Cursor;
use byteorder::{LittleEndian, ReadBytesExt};
let mut rdr = Cursor::new([header[0], header[1]]);
//The following call will only panic if the cursor doesn't have enough data. That should
// never happen
rdr.read_u16::<LittleEndian>().expect("Header parsing error.")
}
Then, a function that receives the data from the socket until it is empty (or something goes wrong) and manages the state of the client. It works like this:
- Read the socket until it is empty and push all the data to the client buffer
- Once that is done, based on the client status, it tries to gather the whole packet so it can be parsed.
- If the client is idle, wait for two bytes that represent the header and then wait for the rest of the packet.
- Once the packet is done and there is nothing else to consume in the buffer, the client goes back to
Idle
.
fn read(client: &mut Client) {
loop {
let mut buf = [0; 2048];
match client.socket.try_read(&mut buf) {
Err(e) => {
println!("Error while reading socket: {:?}", e);
//Should "client" be reset here, or are socket errors unrecoverable and the
// server will just close the connection?
return;
}
Ok(None) => break,
Ok(Some(len)) => {
println!("Received raw: {:?}", &buf[..len]);
client.buffer.reserve(len as usize);
for i in 0..len {
client.buffer.push_back(buf[i]);
}
}
}
}
while client.buffer.len() > 0 {
let buffer_len = client.buffer.len();
if client.status == ClientStatus::Idle {
client.status = ClientStatus::WaitingHeader([(false, 0), (false, 0)]);
}
if let ClientStatus::WaitingHeader(mut h) = client.status {
assert!(h[1].0 == false);
if buffer_len == 1 {
if h[0].0 == false {
h[0].0 = true;
//Length is known, this won't panic:
h[0].1 = client.buffer.pop_front().unwrap();
} else {
h[1].0 = true;
h[1].1 = client.buffer.pop_front().unwrap();
}
} else {
if h[0].0 == false {
h[0].0 = true;
h[0].1 = client.buffer.pop_front().unwrap();
}
h[1].0 = true;
h[1].1 = client.buffer.pop_front().unwrap();
}
if h[0].0 && h[1].0 {
let data_len = convert_header([h[0].1, h[1].1]);
client.status = ClientStatus::ReceivingData(data_len,
Vec::with_capacity(data_len as usize));
} else {
client.status = ClientStatus::WaitingHeader(h);
}
}
let mut parsing_done = false;
if let ClientStatus::ReceivingData(data_len, ref mut data) = client.status {
while let Some(byte) = client.buffer.pop_front() {
data.push(byte);
if data.len() >= (data_len as usize) {
// We have everything, parse it!
println!("Received some data! Size: {}\n\tValue: {}",
data_len,
//Data could be invalid utf8 and it would panic, but for testing
// purposes, unwrapping is enough, though from_utf8_lossy is an
// option.
String::from_utf8(data.clone()).unwrap());
parsing_done = true;
break;
}
}
}
if parsing_done {
client.status = ClientStatus::Idle;
}
}
}
And finally the Handler
trait implementation of the server and the main function:
impl Handler for Server {
type Timeout = usize;
type Message = ();
fn ready(&mut self, event_loop: &mut EventLoop<Server>, token: Token, events: EventSet) {
match token {
SERVER_TOKEN => {
let client_socket = match self.socket.accept() {
Err(e) => {
println!("Accept error: {}", e);
return;
}
Ok(None) => {
println!("Socket was not ready.");
return;
}
Ok(Some((sock, _addr))) => sock,
};
let new_token = match self.clients.insert(Client::new(client_socket)) {
Ok(t) => t,
Err(client) => {
println!("Connection could not be accepted, max number reached.");
let _ = client.socket.shutdown(Shutdown::Both);
return;
}
};
println!("New connection! Total connections: {}",
self.clients.count());
match event_loop.register(&self.clients[new_token].socket,
new_token,
EventSet::readable(),
PollOpt::edge() | PollOpt::oneshot()) {
Err(e) => {
println!("IO error when registering a client: {}", e);
self.clients.remove(token);
}
_ => (),
}
}
_ => {
let remove = if !events.is_hup() {
let client: &mut Client = match self.clients.get_mut(token) {
Some(c) => c,
None => {
println!("Received token that was not registered ({}). Ignoring.",
token.0);
return;
}
};
read(client);
match event_loop.reregister(&client.socket,
token,
EventSet::readable() | EventSet::hup(),
PollOpt::edge() | PollOpt::oneshot()) {
Err(e) => {
println!("IO error when reregistering a client: {}", e);
true
}
_ => false,
}
} else {
println!("Removing connection {}", token.0);
true
};
if remove {
self.clients.remove(token);
}
}
}
}
}
fn main() {
let address = "127.0.0.1:10000".parse::<SocketAddr>().unwrap();
let server_socket = TcpListener::bind(&address).unwrap();
let mut event_loop = EventLoop::new().unwrap();
let mut handler = Server {
clients: Slab::new_starting_at(Token(1), 10),
socket: server_socket,
};
event_loop.register(&handler.socket,
SERVER_TOKEN,
EventSet::readable(),
PollOpt::edge())
.unwrap();
event_loop.run(&mut handler).unwrap();
}
Some questions
Should I not call
unwrap
when I know something can't fail? Is this a sign that I should have structured things differently? For instance, callingpop_front
in theclient.buffer
once I know the length of the buffer is larger than 0.Is my error handling all-right? I feel like I have a lot of
match
es trying to catch everything. Would it be better to put my code in another function that returns aResult
andtry!
the errors?Can I protect the server from errors in the client? If someone starts sending a bunch of random bytes, the server won't parse them into something meaninful, is that ok or can I somehow detect and prevent that?
1 Answer 1
Disclaimer you didn't provide any suggestions on how to run or test your code. I was able to compile and run the server, but wasn't sure how to properly send a message — my netcat-fu wasn't strong enough.
I also don't have extensive experience with MIO and haven't ever had to defend against a DOS, much less a DDOS. My best advice (which isn't saying much) is to always have limits. Don't allow buffers to grow forever, don't allow infinite amounts of clients.
The code was mostly understandable. My eyes glazed over a bit at the page full of h[1].0
and I was surprised to see VecDeque
used.
Having a
(bool, u8)
is quite confusing. Giving names to the members would have been better. This would have shown me that the code is implementing an array that can be partially full. That functionality exists in a crate called arrayvec.A
Cursor
isn't needed when parsing out the size field as the code will always read from the beginning of the slice.I inlined the header parsing as it basically was only one line after other changes.
There's a few spots that are less-than-efficient.
Anytime you process byte-by-byte is likely to be inefficient, as are any extra copies of data.
Use
extend
instead of pushing bytes one-by-one. This can be combined withdrain
. Beware that this is still copying data.Move
buf
up one level. I think if it is inside the loop it will be reinitialized each time.Instead of reading from the socket into a temporary buffer at all, directly read to the final buffer.
When printing the response, convert the byte array to a
&str
, avoiding the need to clone and make aString
.Instead of
unwrap
ping the response, print theDebug
form of the result. This prevents a panic case.Try to place comments into
expect
calls, making the code more self-documenting.Don't include no trailing punctuation in
expect
. A colon will be added automatically.I'd prefer to see more methods instead of free functions.
Splitting up
read
would help with readability.Definitely use
Result
. It even helps when refactoring. For example, when I extracteddrain_socket
, I didn't originally handle the failure case correctly. I just returned from thedrain_socket
function.A
match
with one useful arm should be written asif let
.Looping while the message state machine can continue allows processing multiple exchanges that have all been buffered up. This should help avoid a case where many small messages fill up the buffer.
After moving the state machine to
ClientStatus
, I generalized the concept of deferring the change of state (the olddone_processing
flag is an example). To do this, I usedmem::replace
to switch out the old state while processing it.Prefer
expect
overunwrap
. This helps you and users understand what failed when it does.
#[macro_use]
extern crate quick_error;
extern crate mio;
extern crate byteorder;
extern crate arrayvec;
use arrayvec::ArrayVec;
use byteorder::{LittleEndian, ReadBytesExt};
use mio::*;
use mio::tcp::*;
use mio::util::Slab;
use std::net::SocketAddr;
use std::{cmp, mem, io};
quick_error! {
#[derive(Debug)]
pub enum Error {
ReadFromSocket(err: io::Error) {
description("could not read from socket")
display("could not read from socket: {}", err)
cause(err)
}
}
}
#[derive(Debug, PartialEq, Eq)]
enum ClientStatus {
Idle,
WaitingHeader(ArrayVec<[u8; 2]>),
ReceivingData(u16, Vec<u8>),
}
#[derive(Debug)]
struct Client {
socket: TcpStream,
status: ClientStatus,
buffer: Vec<u8>,
}
impl Client {
fn new(socket: TcpStream) -> Client {
Client {
socket: socket,
status: ClientStatus::Idle,
buffer: Vec::new(),
}
}
}
#[derive(Debug)]
struct Server {
socket: TcpListener,
clients: Slab<Client>,
}
const SERVER_TOKEN: Token = Token(0);
impl ClientStatus {
fn next(self, buffer: &mut Vec<u8>) -> (Self, bool) {
use ClientStatus::*;
match self {
Idle => {
(WaitingHeader(Default::default()), true)
},
WaitingHeader(mut h) => {
let remaining = h.capacity() - h.len();
assert!(remaining > 0);
let available = cmp::min(remaining, buffer.len());
h.extend(buffer.drain(..available));
if h.len() == h.capacity() {
let data_len = h.as_ref().read_u16::<LittleEndian>().expect("Not enough data to parse header");
(ReceivingData(data_len, Vec::with_capacity(data_len as usize)), true)
} else {
(WaitingHeader(h), false)
}
},
ReceivingData(data_len, mut data) => {
let remaining = (data_len as usize) - data.len();
if buffer.len() >= remaining {
data.extend(buffer.drain(0..remaining));
// We have everything, parse it!
println!("Received some data! Size: {}\n\tValue: {:?}",
data.len(),
std::str::from_utf8(&data));
(Idle, true)
} else {
data.extend(buffer.drain(..));
(ReceivingData(data_len, data), false)
}
},
}
}
}
impl Client {
fn drain_socket(&mut self) -> Result<(), Error> {
loop {
let bytes = try!(self.socket.try_read_buf(&mut self.buffer).map_err(Error::ReadFromSocket));
// Should "self" be reset on failure, or are socket errors
// unrecoverable and the server will just close the
// connection?
match bytes {
None => return Ok(()),
Some(len) => {
let end = self.buffer.len() - len;
println!("Received raw: {:?}", &self.buffer[end..]);
}
}
}
}
fn read(&mut self) {
if let Err(_) = self.drain_socket() { return }
let mut changed = true;
while self.buffer.len() > 0 && changed {
let old_status = mem::replace(&mut self.status, ClientStatus::Idle);
let (next_status, did_change) = old_status.next(&mut self.buffer);
self.status = next_status;
changed = did_change;
}
}
}
impl Handler for Server {
type Timeout = usize;
type Message = ();
fn ready(&mut self, event_loop: &mut EventLoop<Server>, token: Token, events: EventSet) {
match token {
SERVER_TOKEN => {
let client_socket = match self.socket.accept() {
Err(e) => {
println!("Accept error: {}", e);
return;
}
Ok(None) => {
println!("Socket was not ready.");
return;
}
Ok(Some((sock, _addr))) => sock,
};
let new_token = match self.clients.insert(Client::new(client_socket)) {
Ok(t) => t,
Err(client) => {
println!("Connection could not be accepted, max number reached.");
let _ = client.socket.shutdown(Shutdown::Both);
return;
}
};
println!("New connection! Total connections: {}",
self.clients.count());
if let Err(e) = event_loop.register(&self.clients[new_token].socket,
new_token,
EventSet::readable(),
PollOpt::edge() | PollOpt::oneshot()) {
println!("IO error when registering a client: {}", e);
self.clients.remove(token);
}
}
_ => {
let remove = if !events.is_hup() {
let client: &mut Client = match self.clients.get_mut(token) {
Some(c) => c,
None => {
println!("Received token that was not registered ({}). Ignoring.",
token.0);
return;
}
};
client.read();
match event_loop.reregister(&client.socket,
token,
EventSet::readable() | EventSet::hup(),
PollOpt::edge() | PollOpt::oneshot()) {
Err(e) => {
println!("IO error when reregistering a client: {}", e);
true
}
_ => false,
}
} else {
println!("Removing connection {}", token.0);
true
};
if remove {
self.clients.remove(token);
}
}
}
}
}
fn main() {
let address = "127.0.0.1:10000".parse::<SocketAddr>().expect("Could not parse address");
let server_socket = TcpListener::bind(&address).expect("Could not bind to port");
let mut event_loop = EventLoop::new().expect("Could not start event loop");
let mut handler = Server {
clients: Slab::new_starting_at(Token(1), 10),
socket: server_socket,
};
event_loop.register(&handler.socket,
SERVER_TOKEN,
EventSet::readable(),
PollOpt::edge())
.expect("Could not register handler");
event_loop.run(&mut handler).expect("Could not run event loop");
}
Should I not call
unwrap
when I know something can't fail? Is this a sign that I should have structured things differently? For instance, callingpop_front
in the client.buffer once I know the length of the buffer is larger than 0.
This is good intuition. In many cases, checking for a state and then calling unwrap
is a sign that you could do it differently. In this case, you could have matched on the result of pop_front
, and if it was Some
, you know it was present:
if let Some(v) = client.buffer.pop_front()
Is my error handling all-right? I feel like I have a lot of
match
es trying to catch everything. Would it be better to put my code in another function that returns aResult
andtry!
the errors?
Yes, I would generally advocate trying to pervasively use Result
. It's a little tricky when you have an interface you need to adhere to. main
is the primary example - you can't return a Result
from main
! Often I suggest pulling out an inner_main
that returns the Result
and main
formats the error and ends the program.
Can I protect the server from errors in the client? If someone starts sending a bunch of random bytes, the server won't parse them into something meaningful, is that ok or can I somehow detect and prevent that?
You can't prevent a malicious client. You could detect non-sensical data and terminate the connection though. Perhaps you will only accept messages that are 1000 bytes long. If the header says bigger than that, return an error and close the connection.
-
\$\begingroup\$ Thank you very much for your time! I followed your advice and this is the result. I also included a "client.rs" file which is another rust program I'm using to test the server. \$\endgroup\$Luke B.– Luke B.2016年05月28日 19:14:47 +00:00Commented May 28, 2016 at 19:14