I have been wanting to learn more about developing web applications using WebSockets so, with the help of various online tutorials, I built a little Python WebSocket server using Tornado and SQLite.
Basically, whatever string the client sends the server will be written to the database. The details of that operation (like rowID
) will be returned to the client. So far it seems to be working as expected.
Eventually I will need to add a function call after the database connection is established and before the WebSocket server is started. The function will receive a long list of strings, split the list into equal parts, and process each part on a separate CPU using Python's multiprocessing module. That function needs to run forever but it shouldn't block because the WebSocket server needs to be started immediately.
So my questions are:
Obviously, I'd like to know how the code can be improved.
Not so obvious to me, is Tornado making any of the code run asynchronously? For example, client A and client B send a message at almost the same time. Does the code block while the server handles whichever message it received first (I.E. wait while message is written to database, then handle next message)?
Thoughts on how to implement the multiprocessing function such that it doesn't block the WebSocket server from starting immediately?
Thanks :)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#########################
# Dependencies
#########################
# karellen-sqlite
# tornado
#########################
# Modules
#########################
from multiprocessing import Pool, cpu_count
import ntpath
import os
import subprocess
# pysqlite = sqlite3, but with more patches.
from pysqlite2 import connect
import sys
from tornado import ioloop, web, websocket
#########################
# Global Variables
#########################
db = {
# Path to database file.
'path': './database.db',
# Database connection. Don't edit.
'conn': None
}
server = {
'ip': '127.0.0.1',
'port': 8080,
# The IOloop. Don't edit.
'ioloop': None
}
# Clients connected via WebSocket.
clients = set()
# Hosts to ping.
hosts = {
'192.168.1.1',
'google.com'
}
#########################
# Functions
#########################
def print_message(message, exit=False):
"""Print a message to stdout. Optionally exit."""
print('{0}'.format(message), file=sys.stdout)
if exit:
sys.exit(1)
def db_insert(value):
"""Insert value into database."""
q = 'INSERT INTO test VALUES ({0});'.format(value)
db['conn'].execute(q)
db['conn'].commit()
def db_hook(conn, op, db_name, table_name, rowid):
"""Called after database is changed.
Don't modify conn!
Keyword arguments:
conn -- database connection.
op -- type of database operation executed.
db_name -- name of database operated on.
table_name -- name of table operated on.
rowid -- id of affected row.
"""
# Send keyword argument values to client.
db_update = '''conn = {0} \n
op = {1} \n
db_name = {2} \n
table_name = {3} \n
rowid = {4}'''
db_update = db_update.format(dir(conn), op, db_name, table_name, rowid)
ws_send_message(db_update)
def db_connect(db_path):
"""Connect to database."""
if not ntpath.isfile(db_path):
# Create database.
with connect(db_path) as conn:
conn.execute('CREATE TABLE test (int id);')
conn.execute('INSERT INTO test VALUES (666);')
# Save (commit) the changes.
conn.commit()
with connect(db_path) as conn:
# Database exists. Set hook.
conn.set_update_hook(db_hook)
return conn
def ws_send_message(message):
"""Send message to all clients. Remove dead clients."""
removable = set()
for c in clients:
if not c.ws_connection or not c.ws_connection.stream.socket:
removable.add(c)
else:
c.write_message(message)
for c in removable:
clients.remove(c)
print_message('Removed dead client: {0}'.format(dir(c)), False)
def serve_forever():
"""Start WebSocket server."""
global server
app = web.Application(
# Routes.
[
(r'/', IndexHandler),
(r'/ws', DefaultWebSocket)
],
# Directory from which static files will be served.
static_path='.',
# Enable debug mode settings.
debug=True,
)
app.listen(server['port'])
print_message('Server listening at {0}:{1}/'.format(server['ip'], server['port']), False)
# Returns a global IOLoop instance.
server['ioloop'] = ioloop.IOLoop.instance()
# Starts the I/O loop.
server['ioloop'].start()
def parse_ping(cmpl_proc=None):
"""A coroutine. Parse ping result, prep database query.
Keyword arguments:
cmpl_proc -- a CompletedProcess instance.
"""
print_message('cmpl_proc = {0}'.format(cmpl_proc), False)
while True:
# (yield) turns this function into a coroutine.
# The function argument value (cmpl_proc) is accessed by yield.
result = (yield)
# The pinged host.
host = result.args[3]
# 0 = host online. 1 = host offline.
return_code = result.returncode
# UTC: time standard commonly used across the world.
# Returns the time, in seconds, since the epoch as a floating point number.
utc_time = time.time()
# Prepare query statement. Basically an UPSERT.
# Try to update the row.
update = 'UPDATE hosts SET status={1},time_pinged={2} WHERE host={0};'.format(host, return_code, utc_time)
# If update unsuccessful (I.E. the row didn't exist) then insert row.
insert = 'INSERT INTO hosts (host,status,time_pinged) SELECT {0},{1},{2} WHERE (Select Changes()=0);'.format(host, return_code, utc_time)
# Final query.
query = '{0}{1}'.format(update, insert)
print_message('query = {0}'.format(query), False)
def ping(host):
"""Ping each host using a separate process/core."""
# Stop after sending this many ECHO_REQUEST packets.
tries = '3'
# The final ping command.
cmd = ['ping', '-c', tries, host]
# Execute cmd, wait for it to complete, then return a CompletedProcess instance.
# Capture stdout and stderr.
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return result
def mp(p):
"""Uses multiprocessing to ping hosts."""
# A reference to the coroutine.
ping_result = parse_ping()
try:
# next() starts the coroutine.
next(ping_result)
# imap_unordered: results are returned to the parent
# as soon as the child sends them.
for i in p.imap_unordered(ping, hosts):
# send() supplies values to the coroutine.
# Send result of ping to coroutine.
ping_result.send(i)
except Exception as error:
print_message('Multiprocessing error: {0}'.format(error), False)
p.close()
p.terminate()
# Close the coroutine
ping_result.close()
else:
p.close()
p.join()
ping_result.close()
#########################
# Classes
#########################
class IndexHandler(web.RequestHandler):
"""Handle non-WebSocket connections."""
def get(self):
"""Renders the template with the given arguments as the response."""
self.render('index.html')
class DefaultWebSocket(websocket.WebSocketHandler):
"""Handle initial WebSocket connection."""
def open(self):
"""Invoked when a new WebSocket is opened."""
print_message('WebSocket opened.', False)
# Don't delay and/or combine small messages to minimize the number of packets sent.
self.set_nodelay(True)
# Add client to list of connected clients.
clients.add(self)
# Send greeting to client.
self.write_message('Hello from server! WebSocket opened.')
def on_message(self, message):
"""Handle incoming WebSocket messages."""
print_message('Message incoming: {0}'.format(message), False)
db_insert(message)
def on_close(self):
"""Invoked when the WebSocket is closed."""
print_message('WebSocket closed.', False)
#########################
# Start script
#########################
def main():
# Connect to database.
global db
db['conn'] = db_connect(db['path'])
#print_message('db.path = {0} db.conn = {1}'.format(db['path'], dir(db['conn'])), True)
# Ping hosts.
# How to make mp() run forever and not block
# so that the WebSocket server can start immediately ?
print_message('CPUs found: {0}'.format(cpu_count()), False)
# A process pool. Defaults to number of cores.
p = Pool()
# Start pinging.
mp(p)
# Start WebSocket server.
# This needs to start while mp() continues running.
serve_forever()
#########################
# Script entry point.
#########################
if __name__ == "__main__":
main()
2 Answers 2
Imports
os
is unused, you may want to remove it. ntpath
is the specific implementation of os.path
on windows: there is absolutely no need to import it explicitly, you should use os.path
instead, it will make the script multi platform. Lastly, instead of a comment saying that you're using an extension to sqlite3
, you can be more explicit, as per the docs.
I would thus use:
import sys
import os.path
import sqlite3
from karellen.sqlite3 import Connection
from tornado import ioloop, web, websocket
print_message
There is no need to specify sys.stdout
as the output file since it is the default behaviour. You also don't need to use format
to convert something to a string. Just call the builtin str
on it. Better, just pass it as a parameter to print
, it will figure it out.
Since you are exiting with a status code of one after printing a message, you may as well pass your message to sys.exit
directly, it will be printed to sys.stderr
before exiting with a status code of 1.
So I'd write:
def print_message(message, exit=False):
if exit:
sys.exit(message)
print(message)
Database
The with
statement on an open connection manages transactions so there is no need to also call commit
explicitly.
You should not insert the values into your query string directly because it makes the script vulnerable to SQL injections; instead use parametrized queries and pass the values as a tuple.
Lastly, you may want to close the connection at the end.
Tornado
Managing the server and the database connection using dictionaries with values that are not meant to be changed feels ugly. Instead I would rely on:
- Tornado's ability to retrieve the created IOLoop using
IOLoop.current()
- The ability to create a custom application object that is available to each handler to store application specific objects; such as the database connection.
Using default parameters for some functions may also help simplify things.
Lastly, you should use absolute paths rather than relative to the current directory.
I would change some of the elements to something along the lines of:
STATIC_DIR = os.path.abspath(os.path.dirname(__file__))
class DatabaseApp(web.Application):
def __init__(self, connection, *args, **kwargs):
super().__init__(*args, **kwargs)
self.db_connection = connection
class DefaultWebSocket(web.WebSocketHandler):
def on_message(self, message):
print_message('Message Incoming {0}'.format(message))
db_insert(self.application.db_connection, message)
...
def db_connect(filename='database.db'):
db_path = os.path.join(STATIC_DIR, filename)
if not os.path.isfile(db_path):
with sqlite3.connect(db_path, factory=Connection) as conn:
conn.execute(...)
conn.close()
with sqlite3.connect(db_path, factory=Connection) as conn:
conn.set_update_hook(db_hook)
return conn
I'll let you figure the rest out.
1. print_message
The function print_message
is a clear example of a function which is incoherent.
def print_message(message, exit=False):
"""Print a message to stdout. Optionally exit."""
print('{0}'.format(message), file=sys.stdout)
if exit:
sys.exit(1)
Although it's short, it does more than one thing. There is no connection between printing a message and exiting, and this is underlying by the fact that we only exit conditionally, depending on an argument.
Part of the purpose of functions/subroutines is to reduce code repetition. But this shouldn't be done blindly. If removing repetition would make the code harder, not easier to understand, it should not be done.
It's hard to understand why these two things belong together, and it's hard to imagine that they would need to be changed together in future. You should unwrap the printing and exiting back into the code which calls this function.
2. global variables
You use a global variable for the database connection. This might make sense, but it makes it much harder to test and modularize your code. The function db_insert
should have a db connection passed into it as an argument. If possible, this should happen for all functions from main
down, and then you could make the connection into a local variable in the scope of main
. This is particularly important with multithreaded or otherwise concurrent code. You might at some point want to have multiple connections, a connection pool, or make some other changes in how you manage connections. The goal is to be able to do that without having to change every bit of your database code.
3. db_connect
At the end of this function, you return a connection created using a context manager. This may work, but it certainly isn't how context managers are supposed to be used.
with connect(db_path) as conn:
# Database exists. Set hook.
conn.set_update_hook(db_hook)
return conn
The idea of the connect(db_path)
context manager, is that it creates an object at the start of this block and assigns it to conn. Then upon leaving the block, another piece of code is run to clean up the created object. So it would be completely acceptable for conn to be disconnected, otherwise disabled, or completely removed at the end of db_connect
. If you tested your code and it worked, this obviously didn't happen. But it could do in a different version of the library. Besides this, there is no point to using with
if you don't want the cleanup code to run. You can just create the connection in a simple way:
conn = connect(db_path)
# Database exists. Set hook.
conn.set_update_hook(db_hook)
return conn
multiprocessing
part is not implemented yet, can you remove it from the question as it is not ready for review. \$\endgroup\$