I wrote this code to make a non-blocking server without using select()
, my main concern is to catch as many exceptions as possible for worst case scenarios (so I would appreciate some feedback on cases I missed), and to let each client to be handled separately by its own thread.
Any feedback on style/performance/good or bad coding practice is welcome.
import socket
import logging
import threading
import sys
class EchoServer:
BUFFER_SIZE = 1024
# size limit in bytes for the client message to be received
MAX_MSG_SIZE = 1024 * 5
# this maps connected clients socket objects returned from accept() to their address tuple
connected_clients = {}
def __init__(self, port):
self.hostname = 'localhost'
self.port = port
try:
self.sockfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sockfd.bind((self.hostname, self.port))
self.sockfd.listen(10)
self.sockfd.setblocking(False)
except socket.error as e:
logging.critical(e)
logging.critical('Could not start up server. Exiting.')
sys.exit(-1)
def look_for_connections(self):
while True:
try:
client_connfd, client_addr = self.sockfd.accept()
logging.info('Connection from client {!r}'.format(client_addr))
self.connected_clients[client_connfd] = client_addr
client_handling_thread = threading.Thread(target=self.handle_client, args=(client_connfd,))
client_handling_thread.start()
except BlockingIOError:
# no active clients trying to connect, nothing to do
continue
def handle_client(self, connfd: socket.socket):
msg = self.get_client_msg(connfd)
logging.info('Client: {} sent {} bytes.'.format(self.connected_clients[connfd], len(msg)))
sent_bytes_size = self.send_client_msg(connfd, msg)
logging.info('Server sent {} bytes to client: {!r}'.format(sent_bytes_size, self.connected_clients[connfd]))
del self.connected_clients[connfd]
connfd.close()
def startup_server_loop(self):
# this starts the main event loop (accepting connections from client)
# each client get handled by its own thread
server_thread = threading.Thread(target= self.look_for_connections)
server_thread.start()
def get_client_msg(self, connfd: socket.socket):
data = b''
while True:
try:
buffer = connfd.recv(self.BUFFER_SIZE)
if len(buffer) == 0 or len(data) >= self.MAX_MSG_SIZE:
break
else:
data += buffer
except BlockingIOError:
break
return data
def send_client_msg(self, connfd: socket.socket, msg: str):
sent_bytes = 0
total_bytes = len(msg)
total_sent = 0
if len(msg) == 0:
return total_sent
while True:
try:
sent_bytes = connfd.send(msg[sent_bytes: total_bytes])
total_sent += sent_bytes
if sent_bytes == 0:
return total_sent
except BlockingIOError:
return total_sent
# in case client disconnected before sending the echo
except ConnectionResetError:
logging.info('Client {!r} disconnected before sending echo.'.format(self.connected_clients[connfd]))
return total_sent
def __del__(self):
for client in self.connected_clients.keys():
del self.connected_clients[client]
self.sockfd.close()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
my_server = EchoServer(50000)
my_server.startup_server_loop()
1 Answer 1
There seems to be a few misconceptions about blockings calls and the purpose of non-blocking sockets. You wrote in a comment:
I thought this was a good idea for the server to be as responsive as possible for any number of clients connected (isn't that correct?).
Yes, this is a good idea, but no, your implementation does not make it faster. If anything, it will be slightly slower than using blocking sockets.
How is this slower?
Neither your server thread nor your handler threads does anything else than managing their socket. So there is absolutely nothing to be gained from having non-blocking sockets. Besides by using an active loop (while True
without sleep
ing) your threads will eat-up 100% CPU and will never enter a state where it is favorable for the OS to deschedule it, leaving room for other threads, leading to other threads waiting longer before being scheduled.
This is normaly not an issue when a program can make use of multiple cores to spawn several threads concurently. But this is Python, and the GIL prevents us from having more than one thread at once. This means that as long as a thread is handling message from/to clients, the server thread is waiting to be scheduled. And the active loops are not helping for it to be scheduled faster.
But why non-blocking sockets in the first place?
Non-blocking sockets and/or sockets using timeouts are meant to be used in a context where there is other tasks to perform. Either because a single thread performs both the "server" and "handler" part:
def non_blocking_server(port, hostname='localhost'):
try:
sockfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error as e:
logging.critical(e)
logging.critical('Could not open socket. Exiting.')
sys.exit(1)
try:
sockfd.bind((hostname, port))
sockfd.listen(10)
sockfd.setblocking(False)
except socket.error as e:
logging.critical(e)
logging.critical('Could not start up server. Exiting.')
sockfd.close()
sys.exit(2)
connected = {}
receiving = {}
sending = {}
while True:
with suppress(BlockingIOError):
client, client_addr = sockfd.accept()
logging.info('Connection from client {!r}'.format(client_addr))
connected[client] = client_addr
receiving[client] = b''
still_receiving = {}
for client, data in receiving.items():
try:
buf = client.recv(BUFFER_SIZE)
if buf:
still_receiving[client] = data + buf
else:
logging.info('Client: {} sent {} bytes.'.format(connected[client], len(data)))
sending[client] = (memoryview(data), 0)
except BlockingIOError:
still_receiving[client] = data
receiving = still_receiving
still_sending = {}
for client, (message, sent_bytes) in sending.items():
try:
sent = client.send(message[sent_bytes:])
sent_bytes += sent
if sent:
still_sending[client] = (message, sent_bytes)
else:
logging.info('Server sent {} bytes to client: {!r}'.format(sent_bytes, connected[client]))
client.close()
del connected[client]
except BlockingIOError:
still_sending[client] = (message, sent_bytes)
except ConnectionResetError:
# in case client disconnected before we send the echo
logging.info('Client {!r} disconnected before sending echo.'.format(connected[client]))
logging.info('Server sent {} bytes to client: {!r}'.format(sent_bytes, connected[client]))
client.close()
del connected[client]
sending = still_sending
It would be a shame if a message from the first client was partially received and we would need to wait for a second client to connect before being able to receive and handle the rest. Here non-blocking sockets allow us to perform all three operations (accepting connections, receiving messages, sending messages) sequencially in a single thread without being dependent on external resources (such as client connection or network availability).
An other use case would be to check for incomming connection and/or handle them alongside more heavyweight computation in a loop. It allows to fine tune when things are done instead of relying on the OS to schedule threads for us.
Using threads as non-blocking facility.
So, after having said that non-blocking sockets in threads is bad, let's analyze the alternative of having blocking sockets in threads. In this scenario each thread is suspended while waiting for the blocking system call to finish. This has 2 advantages:
- A thread no longer uses 100% CPU while waiting for an event;
- The OS can more easily give priority to actually active threads and/or resume threads whose system call just finished faster.
And since you’re using threads, the OS is free to give resources where they are needed which usually manifests itself as "schedule the thread whose blocking call just finished".
Using threads also call for more readable code as it usually will make you write separate entities for separate kind of actions (a server and a handler, for instance).
Misconception of the "blocking" part of sending/receiving data
From the code you wrote, it seems that you think you will get a BlockingIOError
if no more data can be read/sent. This is false, it only means that the network is not (yet) available.
In case of a read, a BlockingIOError
can mean that:
- the client didn't start sending data;
- the client stopped sending data (but may be sending more in the future);
- the data has been sent by the client but is not yet arrived on the server;
- who knows.
When reading data, you have a few possibilities to know when to stop reading but it's a protocol that must be established between the client and the server:
- the client can
shutdown
its write end of the socket, then the server will be notified by arecv
that doesn't block but return 0 bytes; - the client can send a fixed-sized payload as a header to its message indicating the length of the incomming message;
- the client can send a fixed-length message, filling missing data with whatever makes sense;
- the client can end its message with a specific delimiter;
- ...
In case of a send a BlockingIOError
is usually a sign that the network interface is used by another thread and that you should wait before sending your data.
Code review
With that in mind, you might have understood that your get_client_msg
and send_client_msg
are not quite doing the proper thing. But that's not an issue since we’re removing the non-blocking part of the sockets anyway.
You also have a few functions pertaining to handling the client socket where you pass said socket as parameter. The only reason that you left them in the EchoServer
class is to be able to use self.connected_clients[connfd]
. Instead, create a new class dedicated to handling a client, and pass the address as a parameter of this class alongside the socket.
Also, to simplify thread management, both classes can inherit from threading.Thread
. A first rewrite would look like:
import socket
import logging
import threading
import sys
class EchoHandler(threading.Thread):
BUFFER_SIZE = 1024
# size limit in bytes for the client message to be received
MAX_MSG_SIZE = 1024 * 5
def __init__(self, sockfd, address):
super().__init__()
self.sockfd = sockfd
self.address = address
def run(self):
msg = self.get_client_msg()
logging.info('Client: %r sent %d bytes.', self.address, len(msg))
sent_bytes_size = self.send_client_msg(msg)
logging.info('Server sent %d bytes to client: %r', sent_bytes_size, self.address)
self.sockfd.close()
def get_client_msg(self):
data = b''
while True:
buffer = self.sockfd.recv(self.BUFFER_SIZE)
data += buffer
if not(buffer) or len(data) >= self.MAX_MSG_SIZE:
return data
def send_client_msg(self, msg):
total_sent = 0
message = memoryview(msg)
while True:
try:
sent_bytes = self.sockfd.send(message[total_sent:])
total_sent += sent_bytes
if sent_bytes == 0:
return total_sent
except ConnectionResetError:
# in case client disconnected before we send the echo
logging.info('Client %r disconnected before sending echo.', self.address)
return total_sent
class EchoServer(threading.Thread):
def __init__(self, port, hostname='localhost'):
super().__init__()
try:
self.sockfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error as e:
logging.critical(e)
logging.critical('Could not open socket. Exiting.')
sys.exit(1)
try:
self.sockfd.bind((hostname, port))
self.sockfd.listen(10)
except socket.error as e:
logging.critical(e)
logging.critical('Could not start up server. Exiting.')
self.sockfd.close()
sys.exit(2)
def run(self):
while True:
client_connfd, client_addr = self.sockfd.accept()
logging.info('Connection from client %r', client_addr)
self.connected_clients[client_connfd] = client_addr
EchoHandler(client_connfd, client_addr).start()
def __del__(self):
self.sockfd.close()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
EchoServer(50000).start()
But this is starting to look a lot like socketserver
:
import logging
import socketserver
class EchoHandler(socketserver.BaseRequestHandler):
BUFFER_SIZE = 1024
# size limit in bytes for the client message to be received
MAX_MSG_SIZE = 1024 * 5
def finish(self):
self.request.close()
def handle(self):
msg = self.get_client_msg()
logging.info('Client: %s sent %d bytes.', self.client_address, len(msg))
sent_bytes = self.send_client_msg(msg)
logging.info('Server sent %d bytes to client: %r', sent_bytes, self.client_address)
def get_client_msg(self):
data = b''
while True:
buf = self.request.recv(self.BUFFER_SIZE)
data += buf
if not buf or len(data) >= self.MAX_MSG_SIZE:
return data
def send_client_msg(self, msg):
total_sent = 0
message = memoryview(msg)
while True:
try:
sent_bytes = self.request.send(message[total_sent:])
total_sent += sent_bytes
if sent_bytes == 0:
return total_sent
except ConnectionResetError:
# in case client disconnected before we send the echo
logging.info('Client %r disconnected before sending echo.', self.client_address)
return total_sent
class EchoServer(socketserver.ThreadingTCPServer):
allow_reuse_address = True
def process_request(self, request, client_address):
logging.info('Connection from client %r', client_address)
super().process_request(request, client_address)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
with EchoServer(('localhost', 50000), EchoHandler) as server:
server.serve_forever()
This is more to the point because all the boilerplate is already handled by Python.
Explore related questions
See similar questions with these tags.
accept
call: you do nothing in the meantime and replace a blocking call with an active loop... \$\endgroup\$