3
\$\begingroup\$

This is a multi-threaded socket client written to talk to the Scrolls socket server. The idea is to send commands to the socket server and respond to messages received via callbacks. I've never done multi-threading before and would like the code reviewed for code correctness, best practices, and potential issues regarding how I'm handling threading.

GitHub

from Crypto.Cipher import PKCS1_v1_5
from Crypto.PublicKey import RSA
from base64 import b64encode
from threading import Thread
from Queue import Queue
import socket
import json
import time
class PingThread(Thread):
 def __init__(self, scrolls_client):
 self.scrolls_client = scrolls_client
 self.stopped = False
 Thread.__init__(self)
 def run(self):
 while not self.stopped:
 self.scrolls_client.send({'msg': 'Ping'})
 time.sleep(10)
class MessageThread(Thread):
 def __init__(self, scrolls_client):
 self.scrolls_client = scrolls_client
 self.stopped = False
 Thread.__init__(self)
 def run(self):
 while not self.stopped:
 # grab a message from queue
 message = self.scrolls_client.queue.get()
 # make a copy of the current subscribers to keep this thread-safe
 current_subscribers = dict(self.scrolls_client.subscribers)
 # send message to subscribers
 for subscriber_key, subscriber_callback in current_subscribers.iteritems():
 # msg or op should match what we asked for
 if 'msg' in message and message['msg'] == subscriber_key:
 subscriber_callback(message)
 elif 'op' in message and message['op'] == subscriber_key:
 subscriber_callback(message)
 # signals to queue job is done
 self.scrolls_client.queue.task_done()
class ReceiveThread(Thread):
 def __init__(self, scrolls_client):
 self.scrolls_client = scrolls_client
 self.stopped = False
 Thread.__init__(self)
 def run(self):
 while not self.stopped:
 self.scrolls_client.receive()
class ScrollsSocketClient(object):
 """
 A Python client for the Scrolls socket server.
 Usage:
 YOUR_SCROLLS_EMAIL = '[email protected]'
 YOUR_SCROLLS_PASSWORD = 'password'
 scrolls = ScrollsApi(YOUR_SCROLLS_EMAIL, YOUR_SCROLLS_PASSWORD)
 """
 queue = Queue()
 subscribers = {}
 _socket_recv = 8192
 _scrolls_host = '54.208.22.193'
 _scrolls_port = 8081
 _scrolls_publickey = """-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCYUK5tWE8Yb564e5VBs05uqh38
mLSRF76iHY4IVHtpXT3FiI6SWoVDyOAiAAe/IJwzUmjCp8V4nmNX26nQuHR4iK/c
U9G7XhpBLfmQx0Esx5tJbYM0GR9Ww4XeXj3xZZBL39MciohrFurBENTFtrlu0EtM
3T8DbLpZaJeXTle7VwIDAQAB
-----END PUBLIC KEY-----"""
 def __init__(self, email, password):
 self.email = email
 self.password = password
 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 self.socket.connect((self._scrolls_host, self._scrolls_port))
 self.ping_thread = PingThread(self)
 self.message_thread = MessageThread(self)
 self.receive_thread = ReceiveThread(self)
 # self.ping_thread.start()
 self.receive_thread.start()
 self.message_thread.start()
 def login(self):
 login_params = {
 'msg': 'SignIn',
 'email': self._encrypt(self.email),
 'password': self._encrypt(self.password)
 }
 self.send(login_params)
 self.ping_thread.start()
 def subscribe(self, event, callback):
 # add subscribers
 self.subscribers[event] = callback
 def unsubscribe(self, event):
 # rm subscribers
 self.subscribers.pop(event)
 def send(self, params):
 # send message
 self.socket.send(json.dumps(params))
 def receive(self):
 stream_data = ''
 data_json = None
 while (1):
 # read data from the buffer
 data = self.socket.recv(self._socket_recv)
 if not data:
 # no more data being transmitted
 break
 else:
 # append data to the response
 stream_data += data
 try:
 # line breaks means we are handling multiple responses
 if stream_data.find("\n\n"):
 # split and parse each response
 for stream_data_line in stream_data.split("\n\n"):
 # try to load as JSON
 data_json = json.loads(stream_data_line)
 # we have a response, add it to the queue
 self.queue.put(data_json)
 except:
 # invalid json, incomplete data
 pass
 def quit(self):
 # stop all threads and close the socket
 self.receive_thread.stopped = True
 self.receive_thread._Thread__stop()
 self.message_thread.stopped = True
 self.message_thread._Thread__stop()
 self.ping_thread.stopped = True
 self.ping_thread._Thread__stop()
 self.socket.close()
 def _encrypt(self, data):
 key = RSA.importKey(self._scrolls_publickey)
 cipher = PKCS1_v1_5.new(key)
 encrypted_data = cipher.encrypt(data)
 return b64encode(encrypted_data)
Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Jun 28, 2013 at 16:59
\$\endgroup\$
0

2 Answers 2

2
\$\begingroup\$

Instead of:

Thread.__init__(self)

Do (for example):

super(MessageThread, self).__init__()

Also it's more Pythonic to do:

while True:

Instead of:

while (1):

Instead of using the stopped variables, it may be more efficient to use a variable such as:

self.active = True

That way you can do this which saves you from having to do extra processing on the "not" every iteration:

while self.active:

Even better though may be to just do while True and instead of setting stopped to True and calling Thread_stop(), just call exit() on the threads, then you shouldn't need the stopped variables at all.

Also in receive() you could simplify it slightly

def receive(self):
 stream_data = ''
 while True:
 # read data from the buffer
 data = self.socket.recv(self._socket_recv)
 if not data:
 # no more data being transmitted
 return # now you don't need an else
 # append data to the response
 stream_data += data
 try:
 # line breaks means we are handling multiple responses
 if stream_data.find("\n\n"):
 # split and parse each response
 for stream_data_line in stream_data.split("\n\n"):
 # we have a response, add it to the queue
 # no need to store json.loads result that's only used once
 self.queue.put(json.loads(stream_data_line))
 except:
 # invalid json, incomplete data
 pass
answered Jul 4, 2013 at 0:45
\$\endgroup\$
0
1
\$\begingroup\$

You should read PEP0008, it's the invaluable Python style guide. One such suggestion is to structure your modules so that all the plain import statements are first, separated from the from _ import _ lines. It's neater and easier to read.

import socket
import json
import time
from base64 import b64encode
from threading import Thread
from Queue import Queue
from Crypto.Cipher import PKCS1_v1_5
from Crypto.PublicKey import RSA

Why are you using two different if statements to run the same line? If you need to run it in either case, use or, not an elif.

 if ('msg' in message and message['msg'] == subscriber_key or
 'op' in message and message['op'] == subscriber_key):
 subscriber_callback(message)

If you're going to have a comment under a function declaration, you should make it a docstring so that it can be useful to others reading your code (as docstrings are accessible programmatically through the interpreter).

def subscribe(self, event, callback):
 """Add subscribers"""

Though in that particular case it seems pretty clear what subscribe does to me.

Your receive function has a lot of unnecessary comments, I'd strip them out especially in places that are pretty clear from the actual code. In particular:

 if not data:
 # no more data being transmitted
 break
 else:
 # append data to the response
 stream_data += data

Instead of using str.find(s) use if s in str. It's faster and more Pythonic:

 if "\n\n" in stream_data:

Never use except without giving it a specific exception to look for. If you made a syntax error like a typo then you'd actually not notice because of that except and you'd probably think there was a problem with the JSON. If you're looking for errors from invalid JSON, that raises a ValueError, so raise that instead.

 except ValueError:
 # invalid json, incomplete data
 pass
answered Aug 28, 2015 at 17:06
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.