4
\$\begingroup\$

Even though I am working with one socket, I have wrapped it with an object which allows .recv() method which accepts timeout, using Python's select.poll().

The initialization of the wrapping object defines a socket using the address/protocol family, of type stream; then it connects, flags it as non-blocking and registers it to a poll object. So the .__init__() method looks like this:

def __init__(self, host, port):
 self.host = host
 self.port = port
 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 self.poll = select.poll()
 event_in_mask = select.POLLIN | select.POLLPRI
 event_err_mask = select.POLLERR 
 event_closed_mask = select.POLLHUP | select.POLLNVAL
 event_mask = event_in_mask | event_err_mask | event_closed_mask
 self.poll.register(self.socket, event_mask)

I mask the poll object as I am interested in querying only incoming data or errors.

The receive method will look like this:

def recv(self, timeout):
 events = self.poll.poll(timeout)
 if not events:
 return ""
 event = events[0][1]
 ## Invalid request; descriptor not open
 if event & select.POLLNVAL:
 err = "Invalid request; descriptor not open"
 raise ConnectionClosedError(err, host=self.host, port=self.port)
 ## Hung up
 if event & select.POLLHUP:
 err = "Hung up"
 raise ConnectionClosedError(err, host=self.host, port=self.port)
 ## Error condition of some sort
 if event & select.POLLERR:
 err = "Error condition of some sort"
 raise ConnectionClosedError(err, host=self.host, port=self.port)
 ## There is data to read
 ready = event & select.POLLPRI or event & select.POLLIN
 ## Timeout expiry; nothing was received, we pass an empty string
 if not ready:
 return ""
 return self._recv()

The ._recv() method simply calls self.socket.recv(4096) with some error catching.

And finally, the .close() method will look like that:

def close(self):
 self.poll.unregister(self.socket)
 self.socket.shutdown(socket.SHUT_RDWR)
 self.socket.close()

My questions are:

  • is there anything redundant here?
  • is polling the right choice? (I've used select.select() before, but it has caused some issues)
  • do you think the general architecture makes sense?

EDIT: The ConnectionClosedError is some custom Exception that wraps the socket exception just for convenience.

The Complete Code

## Framework
import select
import socket
###################################
## ----- Connection Errors ----- ##
###################################
import errno
class ConnectionError(IOError):
 def __init__(self, err, host=None, port=None):
 self.err = err
 self.host = host
 self.port = port
 err_msg = "{self.err}"
 def __str__(self):
 return self.err_msg.format(self=self)
class ConnectionClosedError(ConnectionError):
 err_msg = "Connection closed (host '{self.host}', port {self.port})"
class ConnectionRefusedError(ConnectionError):
 err_msg = "{self.err} (host '{self.host}', port {self.port})"
########################
## ----- Client ----- ##
########################
class SocketClient(object):
 """ Low level socket client. """
 def __init__(self, host, port, blocking=False):
 self.host = host
 self.port = port
 self.blocking = blocking
 ## Initialize socket
 try:
 self.init_socket()
 ## Couldn't
 except socket.error as socket_error:
 ## Connection was refused
 if socket_error.errno == errno.ECONNREFUSED:
 raise ConnectionRefusedError(socket_error, host=self.host,
 port=self.port)
 ## There was some other socket problem
 raise ConnectionError(socket_error)
 def init_socket(self):
 """ Instantiate a local socket and connect it with a remote
 socket. """
 ## AF_INET is a constant representing the address/protocol family
 ## SOCK_STREAM is a constant representing the socket type
 ## The function returns a Socket object
 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 ## Connect to server (a remote socket)
 ## The format of the address depends on the address family. In the case of
 ## AF_INET, it is a pair (host, port), where 'host' is a string
 ## representing either a hostname in internet domain notation like
 ## 'daring.cwi.nl' or an IPv4 address like '100.50.200.5', and port is an
 ## integer.
 try:
 self.socket.connect((self.host, self.port))
 except TypeError:
 if not isinstance(self.port, int):
 raise ConnectionError("port must be int (got {})".format(self.port))
 ## We set the socket to non-blocking mode
 if not self.blocking:
 self.socket.setblocking(0)
 ## We construct a polling object and register the socket
 self.poll = select.poll()
 event_in_mask = select.POLLIN | select.POLLPRI
 event_err_mask = select.POLLERR
 event_closed_mask = select.POLLHUP | select.POLLNVAL
 event_mask = event_in_mask | event_err_mask | event_closed_mask
 self.poll.register(self.socket, event_mask)
 def close(self):
 """ Close the socket. """
 ## Unregister the socket
 self.poll.unregister(self.socket)
 self.socket.shutdown(socket.SHUT_RDWR)
 self.socket.close()
 def _recv(self):
 """ A thin wrapper around the socket's recv method. """
 ## Receive data from socket
 ## 4096 comes from a recommendation in the python module documentation
 ## for the socket module
 try:
 return self.socket.recv(4096)
 ## Couldn't receive
 except socket.error as socket_error:
 ## Connection was closed
 if socket_error.errno == errno.EBADF:
 raise ConnectionClosedError(socket_error, host=self.host,
 port=self.port)
 ## There was some other socket problem
 raise ConnectionError(socket_error)
 def recv(self, timeout=None):
 """ Receive data from socket. If *timeout* is given, smartly return an
 empty string after *timeout* seconds have passed. """
 ## If not timeout was given, we think of the socket as if it is ready now
 if not timeout:
 return self._recv()
 ## Otherwise, we poll the socket, to check if there is data to read
 events = self.poll.poll(timeout)
 ## If no events were returned, socket is not ready
 if not events:
 return ""
 ## Otherwise, we're interested only in one event and we query it
 event = events[0][1]
 ## Invalid request; descriptor not open
 if event & select.POLLNVAL:
 err = "Invalid request; descriptor not open"
 raise ConnectionClosedError(err, host=self.host, port=self.port)
 ## Hung up
 if event & select.POLLHUP:
 err = "Hung up"
 raise ConnectionClosedError(err, host=self.host, port=self.port)
 ## Error condition of some sort
 if event & select.POLLERR:
 err = "Error condition of some sort"
 raise ConnectionClosedError(err, host=self.host, port=self.port)
 ## There is data to read
 ready = event & select.POLLPRI or event & select.POLLIN
 ## Timeout expiry; nothing was received, we pass an empty string
 if not ready:
 return ""
 ## Ready? Query socket
 return self._recv()
 def send(self, string):
 """ Send *string* to the socket. Continue to send data from *string* until
 either all data has been sent or an error occurs. Return ``None`` on
 success, raise on failure. """
 ## We use 'sendall' as we do not wish to allow partial messages
 self.socket.sendall(string)
TheCoffeeCup
9,5164 gold badges38 silver badges96 bronze badges
asked Feb 10, 2014 at 10:10
\$\endgroup\$
0

1 Answer 1

1
\$\begingroup\$

Looks okay in general. One set of lines that gave me pause was:

## We set the socket to non-blocking mode
if not self.blocking:
 self.socket.setblocking(0)

it looked redundant until I realized this was initial socket configuration. So it might be a little clearer if you did something like:

self.socket.setblocking(1 if self.blocking else 0)

or even

self.socket.setblocking(int(self.blocking))

I think unconditionally setting it to something drives home the fact that it's a configuration step.

answered Dec 14, 2015 at 16:42
\$\endgroup\$
0

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.