2
\$\begingroup\$

This is a follow-up to Extracting lines from a bytearray ; I got a pretty good answer to the narrow question I asked there, but was invited to show more of the code in case there was a simpler way to solve the larger problem. So here's my entire module. Note that the use of PollSelector instead of DefaultSelector is intentional, as the expected use involves a bunch of threads each of which uses this to multiplex input from order of 3 file descriptors, so the overhead of kqueue/epoll/devpoll is undesirable and the linear cost of classic poll is not a problem. Yes, it would arguably be better to merge all of those threads into one, but that would be a much larger overhaul than I can afford to make right now.

As before, I am primarily interested in improvements to efficiency, improvements to readability, and places where I'm not taking full advantage of the Python (3.4) standard library. I am not interested in third-party modules even if they do substantially the same thing as this.

import fcntl
import heapq
import locale
import os
import selectors
import time
# This is a module global because locale.getpreferredencoding(True) is
# not safe to call off-main-thread.
DEFAULT_ENCODING = locale.getpreferredencoding(True)
class LineMultiplexor:
 """Priority queue which produces lines from one or more file objects,
 which are used only for their fileno() and as identifying
 labels, as data becomes available on each. If data is
 available on more than one file at the same time, the user can
 specify the order in which to return them. The user can also
 indicate that a particular file's lines should be processed in
 queue-sorted order, rather than as they come in.
 Files open in text mode are decoded according to their own
 stated encoding; files open in binary mode are decoded
 according to locale.getpreferredencoding(). Newline handling
 is universal.
 Files may be added or removed from the pollset with the
 add_file and drop_file methods (in the latter case, the file
 will not be closed). You can pass in bare fds as well as file
 objects. Files are automatically removed from the pollset and
 closed when they reach EOF.
 Each item produced by .get() or .peek() is (fileobj, string);
 trailing whitespace is stripped from the string. EOF on a
 particular file is indicated as (fileobj, None), which occurs
 only once; when it occurs, fileobj has already been closed.
 If no data is available (either from .peek(), or after .get
 times out) the result is (None, None).
 If used as an iterator, iteration terminates when all files have
 reached EOF. Adding more files will reactivate iteration.
 """
 def __init__(self, default_timeout=None):
 self.poller = selectors.PollSelector()
 self.output_q = []
 self.default_timeout = default_timeout
 self.seq = 0
 def add_file(self, fp, priority=0, sort_lines=False):
 """Add FP to the poll set with priority PRIORITY (default 0).
 Larger priority numbers are _lower_ priorities.
 If SORT_LINES is true, the lines of this file will be
 produced in alphabetical order (within each chunk) rather
 than file order.
 """
 buf = NonblockingLineBuffer(
 fp,
 lineno = -1 if sort_lines else 0,
 priority = priority
 )
 self.poller.register(fp, selectors.EVENT_READ, buf)
 def drop_file(self, fp):
 """Remove FP from the poll set. Does not close the file."""
 self.poller.unregister(fd)
 def peek(self):
 """Returns the first item in the output queue, if any, without
 blocking and without removing it from the queue.
 """
 if not self.output_q:
 self._poll(0)
 return self._extract(False)
 def get(self, timeout=None):
 """Retrieve the first item from the output queue. If there
 are none, blocks until data arrives or TIMEOUT expires."""
 self._poll(timeout)
 return self._extract(True)
 def __iter__(self):
 return self
 def __next__(self):
 """Iteration calls .get until all files are exhausted."""
 if not self.output_q and not self.poller.get_map():
 raise StopIteration
 return self.get()
 # Internal: queue management.
 def _insert(self, priority, lineno, line, fp):
 # self.seq ensures that everything in the queue is strictly
 # ordered before we get to 'fp', which prevents heapq from
 # trying to sort file objects.
 heapq.heappush(self.output_q, (priority, lineno, line, self.seq, fp))
 self.seq += 1
 def _extract(self, pop):
 if not self.output_q:
 return (None, None)
 if pop:
 qitem = heapq.heappop(self.output_q)
 else:
 qitem = self.output_q[0]
 return (qitem[4], qitem[2])
 # Internal: the core read loop.
 def _poll(self, timeout=None):
 if timeout is None:
 timeout = self.default_timeout
 while True:
 if timeout is not None and timeout > 0:
 entry = time.monotonic()
 events = self.poller.select(timeout)
 if events:
 may_emit = []
 for k, _ in events:
 buf = k.data
 if buf.absorb():
 may_emit.append(buf)
 for buf in may_emit:
 lineno = buf.lineno
 prio = buf.priority
 for line in buf.emit():
 self._insert(prio, lineno, line, buf.fp)
 if lineno != -1:
 lineno += 1
 buf.lineno = lineno
 if buf.at_eof:
 self.drop_file(buf.fp)
 buf.close()
 if self.output_q or timeout == 0:
 break
 # If some of the file descriptors are slowly producing very
 # long lines, we might not actually emit any data for longer
 # than the timeout, even though the system call never blocks
 # for too long. Therefore, we must manually check whether
 # the timeout has expired, and adjust it downward if it hasn't.
 if timeout is not None and timeout > 0:
 now = time.monotonic()
 timeout -= now - entry
 if timeout <= 0:
 break
class NonblockingLineBuffer:
 """Helper class used by LineMultiplexor; responsible for buffering
 input from individual file descriptors until full lines are
 available."""
 def __init__(self, fp, lineno, priority):
 global DEFAULT_ENCODING
 self.fp = fp
 self.lineno = lineno
 self.priority = priority
 if hasattr(fp, 'fileno'):
 self.fd = fp.fileno()
 if hasattr(fp, 'encoding'):
 self.enc = fp.encoding
 else:
 self.enc = DEFAULT_ENCODING
 else:
 assert isinstance(fp, int)
 self.fd = fp
 self.enc = DEFAULT_ENCODING
 self.buf = bytearray()
 self.at_eof = False
 self.carry_cr = False
 fl = fcntl.fcntl(self.fd, fcntl.F_GETFL)
 if not (fl & os.O_NONBLOCK):
 fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
 def close(self):
 if hasattr(self.fp, 'close'):
 self.fp.close()
 else:
 os.close(self.fd)
 def absorb(self):
 while True:
 try:
 block = os.read(self.fd, 8192)
 except BlockingIOError:
 break
 if not block:
 self.at_eof = True
 break
 self.buf.extend(block)
 return bool(buf) or self.at_eof
 def emit(self):
 buf = self.buf
 if buf:
 # Deal with '\r\n' having been split between absorb() events.
 if self.carry_cr and buf[0] == b'\n':
 del buf[0]
 self.carry_cr = False
 if buf:
 lines = buf.splitlines()
 if self.is_open and buf[-1] not in (b'\r', b'\n'):
 buf = lines.pop()
 else:
 if buf[-1] == b'\r':
 self.carry_cr = True
 del buf[:]
 for line in lines:
 yield (self.fp, line.decode(self.enc).rstrip())
 if self.at_eof:
 yield (self.fp, None)
 self.buf = buf
asked Mar 17, 2015 at 18:29
\$\endgroup\$
2
  • 1
    \$\begingroup\$ I see references to poller which probably should be self.poller. Have you tested this version of the code at all? \$\endgroup\$ Commented Mar 18, 2015 at 9:35
  • \$\begingroup\$ @JanneKarila Unfortunately not; the larger application still has parts all over the floor. I'll correct that here, though. \$\endgroup\$ Commented Mar 18, 2015 at 13:07

1 Answer 1

1
\$\begingroup\$

The thing that would worry me if I had to use or maintain this code is that the interface is very complex. That big docstring for LineMultiplexor is a hint that this class is trying to do too many things, and I would want to try to refactor it into a set of simpler classes.

Normally one tries to follow the single responsibility principle, but the LineMultiplexor class has five main areas of responsibility:

  1. Maintaining a set of file descriptors.
  2. Multiplexing lines of input from these file descriptors.
  3. Decoding the lines.
  4. Sorting the lines according to the priority of the file descriptors and (optionally) the lines themselves.
  5. Stripping trailing whitespace.

I can understand how a big ball of mud like this develops: you start out with a class handling one area of responsibility, and then as you discover new requirements it is always easier to add functionality to an existing class than to redesign and refactor.

My initial refactoring plan would look like this:

  1. Delegate the decoding of lines to NonblockingLineBuffer.

  2. Leave out the stripping of trailing whitespace. (The caller can do it if they need.)

  3. Split what's left into two classes, one to handle the multiplexing and one to handle the sorting.

Other points that I noticed:

  1. "Multiplexer" is spelled thus.

  2. There seems to be a typo in drop_file (fd should be fp). This suggests that you haven't actually run or tested this code, and so it's massively premature to ask people to review it.

  3. The multiplexer turns on O_NONBLOCK for each file descriptor added to the set. This behaviour ought to be documented.

  4. Returning exceptional values like (None, None) leads to error-prone code (the caller has to remember to check for the exceptional value and it's easy to forget). Better to raise an exception.

  5. Similarly, returning (fd, None) to indicate end of file seems like it's going to be awkward to handle. Are you sure you need this? What would the caller be able to do about it anyway, since you've already closed the file. It might be better to have some other mechanism for signalling end of file.

  6. The NonblockingLineBuffer class has lineno and priority attributes that it does not actually use. (They are only used by LineMultiplexor.) It's worth avoiding this kind of tight coupling between classes if possible.

  7. Consider using seq = itertools.count() and then next(seq).

  8. Similarly, considering using lineno = itertools.repeat(-1) if sort_lines else itertools.count() and then next(lineno).

answered Mar 22, 2015 at 16:05
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.