This is a multi-threaded socket client written to talk to the Scrolls socket server. The idea is to send commands to the socket server and respond to messages received via callbacks. I've never done multi-threading before and would like the code reviewed for code correctness, best practices, and potential issues regarding how I'm handling threading.
from Crypto.Cipher import PKCS1_v1_5
from Crypto.PublicKey import RSA
from base64 import b64encode
from threading import Thread
from Queue import Queue
import socket
import json
import time
class PingThread(Thread):
def __init__(self, scrolls_client):
self.scrolls_client = scrolls_client
self.stopped = False
Thread.__init__(self)
def run(self):
while not self.stopped:
self.scrolls_client.send({'msg': 'Ping'})
time.sleep(10)
class MessageThread(Thread):
def __init__(self, scrolls_client):
self.scrolls_client = scrolls_client
self.stopped = False
Thread.__init__(self)
def run(self):
while not self.stopped:
# grab a message from queue
message = self.scrolls_client.queue.get()
# make a copy of the current subscribers to keep this thread-safe
current_subscribers = dict(self.scrolls_client.subscribers)
# send message to subscribers
for subscriber_key, subscriber_callback in current_subscribers.iteritems():
# msg or op should match what we asked for
if 'msg' in message and message['msg'] == subscriber_key:
subscriber_callback(message)
elif 'op' in message and message['op'] == subscriber_key:
subscriber_callback(message)
# signals to queue job is done
self.scrolls_client.queue.task_done()
class ReceiveThread(Thread):
def __init__(self, scrolls_client):
self.scrolls_client = scrolls_client
self.stopped = False
Thread.__init__(self)
def run(self):
while not self.stopped:
self.scrolls_client.receive()
class ScrollsSocketClient(object):
"""
A Python client for the Scrolls socket server.
Usage:
YOUR_SCROLLS_EMAIL = '[email protected]'
YOUR_SCROLLS_PASSWORD = 'password'
scrolls = ScrollsApi(YOUR_SCROLLS_EMAIL, YOUR_SCROLLS_PASSWORD)
"""
queue = Queue()
subscribers = {}
_socket_recv = 8192
_scrolls_host = '54.208.22.193'
_scrolls_port = 8081
_scrolls_publickey = """-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCYUK5tWE8Yb564e5VBs05uqh38
mLSRF76iHY4IVHtpXT3FiI6SWoVDyOAiAAe/IJwzUmjCp8V4nmNX26nQuHR4iK/c
U9G7XhpBLfmQx0Esx5tJbYM0GR9Ww4XeXj3xZZBL39MciohrFurBENTFtrlu0EtM
3T8DbLpZaJeXTle7VwIDAQAB
-----END PUBLIC KEY-----"""
def __init__(self, email, password):
self.email = email
self.password = password
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self._scrolls_host, self._scrolls_port))
self.ping_thread = PingThread(self)
self.message_thread = MessageThread(self)
self.receive_thread = ReceiveThread(self)
# self.ping_thread.start()
self.receive_thread.start()
self.message_thread.start()
def login(self):
login_params = {
'msg': 'SignIn',
'email': self._encrypt(self.email),
'password': self._encrypt(self.password)
}
self.send(login_params)
self.ping_thread.start()
def subscribe(self, event, callback):
# add subscribers
self.subscribers[event] = callback
def unsubscribe(self, event):
# rm subscribers
self.subscribers.pop(event)
def send(self, params):
# send message
self.socket.send(json.dumps(params))
def receive(self):
stream_data = ''
data_json = None
while (1):
# read data from the buffer
data = self.socket.recv(self._socket_recv)
if not data:
# no more data being transmitted
break
else:
# append data to the response
stream_data += data
try:
# line breaks means we are handling multiple responses
if stream_data.find("\n\n"):
# split and parse each response
for stream_data_line in stream_data.split("\n\n"):
# try to load as JSON
data_json = json.loads(stream_data_line)
# we have a response, add it to the queue
self.queue.put(data_json)
except:
# invalid json, incomplete data
pass
def quit(self):
# stop all threads and close the socket
self.receive_thread.stopped = True
self.receive_thread._Thread__stop()
self.message_thread.stopped = True
self.message_thread._Thread__stop()
self.ping_thread.stopped = True
self.ping_thread._Thread__stop()
self.socket.close()
def _encrypt(self, data):
key = RSA.importKey(self._scrolls_publickey)
cipher = PKCS1_v1_5.new(key)
encrypted_data = cipher.encrypt(data)
return b64encode(encrypted_data)
2 Answers 2
Instead of:
Thread.__init__(self)
Do (for example):
super(MessageThread, self).__init__()
Also it's more Pythonic to do:
while True:
Instead of:
while (1):
Instead of using the stopped variables, it may be more efficient to use a variable such as:
self.active = True
That way you can do this which saves you from having to do extra processing on the "not" every iteration:
while self.active:
Even better though may be to just do while True and instead of setting stopped to True and calling Thread_stop(), just call exit() on the threads, then you shouldn't need the stopped variables at all.
Also in receive() you could simplify it slightly
def receive(self):
stream_data = ''
while True:
# read data from the buffer
data = self.socket.recv(self._socket_recv)
if not data:
# no more data being transmitted
return # now you don't need an else
# append data to the response
stream_data += data
try:
# line breaks means we are handling multiple responses
if stream_data.find("\n\n"):
# split and parse each response
for stream_data_line in stream_data.split("\n\n"):
# we have a response, add it to the queue
# no need to store json.loads result that's only used once
self.queue.put(json.loads(stream_data_line))
except:
# invalid json, incomplete data
pass
You should read PEP0008, it's the invaluable Python style guide. One such suggestion is to structure your modules so that all the plain import
statements are first, separated from the from _ import _
lines. It's neater and easier to read.
import socket
import json
import time
from base64 import b64encode
from threading import Thread
from Queue import Queue
from Crypto.Cipher import PKCS1_v1_5
from Crypto.PublicKey import RSA
Why are you using two different if
statements to run the same line? If you need to run it in either case, use or
, not an elif
.
if ('msg' in message and message['msg'] == subscriber_key or
'op' in message and message['op'] == subscriber_key):
subscriber_callback(message)
If you're going to have a comment under a function declaration, you should make it a docstring so that it can be useful to others reading your code (as docstrings are accessible programmatically through the interpreter).
def subscribe(self, event, callback):
"""Add subscribers"""
Though in that particular case it seems pretty clear what subscribe
does to me.
Your receive
function has a lot of unnecessary comments, I'd strip them out especially in places that are pretty clear from the actual code. In particular:
if not data:
# no more data being transmitted
break
else:
# append data to the response
stream_data += data
Instead of using str.find(s)
use if s in str
. It's faster and more Pythonic:
if "\n\n" in stream_data:
Never use except
without giving it a specific exception to look for. If you made a syntax error like a typo then you'd actually not notice because of that except
and you'd probably think there was a problem with the JSON. If you're looking for errors from invalid JSON, that raises a ValueError
, so raise that instead.
except ValueError:
# invalid json, incomplete data
pass