I recently needed to run some command line applications from Python. Whilst this is fairly simple with subprocess.Popen
. I wanted to be able to properly pipe the output.
When running the programs from Python I want three things:
The option to have a low memory footprint.
This is possible with
Popen
by passing some, not all, file objects tostdout
orstderr
.To be able to pass data into multiple outputs. If I want to pass data from
Popen.stdout
tosys.stdout
andmy_file
I should be able to.- To output live. When using
Popen.communicate
orstdout=None
it is not possible to log the output and also display the output.
All of these are fairly easy to implement by using Popen.stdout
with threads. Even though the core of the code is fairly simple. Having to manually create and manage threads each time I use Popen
is not desirable. Further more, given that the code is a convenience wrapper, I decided to try and make the usage as simple as possible.
"""
Teetime - adding tee like functionality to Popen.
Conveniently allow Popen to pass data to any number of sinks.
Sinks can be shared between both stdout and std err and be handled correctly.
.. code-block::
import sys
import teetime
with open('log.txt', 'wb') as f:
teetime.popen_call(
['python', 'test.py'],
stdout=(sys.stdout.buffered, f),
stderr=(sys.stderr.buffered, f),
)
The :code:`popen_call` function is a convenience over :code:`Sinks`.
If you need to interact with the process when it's live then you can
modify the following to suite your needs.
.. code-block::
import sys
import teetime
with open('log.txt', 'wb') as f:
sinks = Sinks(
(sys.stdout.buffered, f),
(sys.stderr.buffered, f),
)
process = sinks.popen(['python', 'test.py'])
with self.make_threads(process) as threads:
threads.join()
self.reset_head()
"""
from __future__ import annotations
from typing import (
Any, Callable, Iterator, List, Optional, Sequence, Tuple, Type
)
import collections
import io
import queue
import subprocess
import threading
QUEUE_SENTINEL = object()
FILE_SENTINEL = b''
TSink = Callable[[bytes], None]
_Threads = collections.namedtuple('Threads', 'out, err, both, queue')
_Sinks = collections.namedtuple('Sinks', 'out, err, both')
class Threads(_Threads):
"""
Thread manager and interface.
Because we need to listen to stdout and stderr at the same time we
need to put the listeners in threads. This is so we can handle
changes when they happen. This comes with two benifits:
1. We don't have to store all the print information in memory.
2. We can display changes as soon as they happen.
To allow stdout and stderr to merge correctly we have to use an
atomic queue. :code:`queue.Queue` is one such example. If we have
no need of combining output as there are no both sinks then the
queue and associated thread won't be created.
This object holds the three threads and the message queue.
"""
out: threading.Thread
err: threading.Thread
both: threading.Thread
queue: queue.Queue
def __new__(
cls,
process: subprocess.Popen,
sinks: Sinks,
flush: bool = True,
) -> Threads:
"""
Create Threads from an active process and sinks.
This creates the required threads to handle the output and sinks.
These threads are created as daemons and started on creation.
This also creates the message queue that merges both stdout and
stderr when needed.
:param process: Process to tee output from.
:param sinks: Places to tee output to.
:param flush: Whether to force flush flushable sinks.
:return: A thread manager for the process and sinks.
"""
o_thread = None
e_thread = None
b_thread = None
_queue: Optional[queue.Queue] = None
out, err, both = sinks.as_callbacks(flush=flush)
if both:
_queue = queue.Queue()
b_thread = cls._make_thread(
target=cls._handle_sinks,
args=(iter(_queue.get, QUEUE_SENTINEL), both)
)
if out is None:
raise ValueError('both is defined, but out is None')
if err is None:
raise ValueError('both is defined, but err is None')
out += (_queue.put,)
err += (_queue.put,)
if out:
o_thread = cls._make_thread(
target=cls._handle_sinks,
args=(iter(process.stdout.readline, FILE_SENTINEL), out)
)
if err:
e_thread = cls._make_thread(
target=cls._handle_sinks,
args=(iter(process.stderr.readline, FILE_SENTINEL), err)
)
return _Threads.__new__(
cls,
o_thread,
e_thread,
b_thread,
_queue
)
def __enter__(self) -> Threads:
"""Context manager pattern."""
return self
def __exit__(self, _1: Any, _2: Any, _3: Any) -> None:
"""Context manager pattern."""
self.end_queue()
@staticmethod
def _make_thread(*args: Any, **kwargs: Any) -> threading.Thread:
"""Make thread."""
thread = threading.Thread(*args, **kwargs)
thread.daemon = True
thread.start()
return thread
@staticmethod
def _handle_sinks(_input: Iterator[bytes], sinks: List[TSink]) -> None:
"""Threaded function to pass data between source and sink."""
for segment in _input:
for sink in sinks:
sink(segment)
def join(
self,
out: bool = True,
err: bool = True,
both: bool = False,
) -> None:
"""Conveniently join threads."""
if out and self.out is not None:
self.out.join()
if err and self.err is not None:
self.err.join()
if both and self.both is not None:
self.both.join()
def end_queue(self) -> None:
"""Send exit signal to the both thread."""
if self.queue is not None:
self.queue.put(QUEUE_SENTINEL)
def _flushed_write(sink: Any) -> TSink:
"""Flush on write."""
def write(value: bytes) -> None:
sink.write(value)
sink.flush()
return write
class Sinks(_Sinks):
"""
Ease creation and usage of sinks.
This handles where the output should be copied to.
It also contains a few convenience functions to ease usage.
These convenience functions are purely optional and the raw form
is still available to some extent.
"""
out: Optional[Tuple[Any, ...]]
err: Optional[Tuple[Any, ...]]
both: Tuple[Any, ...]
def __new__(cls, out: Sequence[Any], err: Sequence[Any]):
"""Create new sinks."""
both: Tuple[Any, ...] = ()
if out and err:
_out = set(out)
_err = set(err)
_both = _out & _err
_out -= _both
_err -= _both
out = tuple(_out)
err = tuple(_err)
both = tuple(_both)
return _Sinks.__new__(cls, out, err, both)
def run(
self,
*args,
flush: bool = True,
threads: Type[Threads] = Threads,
**kwargs,
) -> subprocess.Popen:
"""Conveniently create and execute a process and threads."""
process = self.popen(*args, **kwargs)
self.run_threads(process, flush=flush, threads=threads)
return process
def popen(
self,
*args: Any,
**kwargs: Any,
) -> subprocess.Popen:
"""Conveniently create a process with out and err defined."""
return subprocess.Popen( # type: ignore
*args,
stdout=self.popen_out,
stderr=self.popen_err,
**kwargs,
)
@property
def popen_out(self) -> Optional[int]:
"""Conveniently get the Popen output argument."""
return subprocess.PIPE if self.out is not None else None
@property
def popen_err(self) -> Optional[int]:
"""Conveniently get the Popen error argument."""
return subprocess.PIPE if self.err is not None else None
def run_threads(
self,
process: subprocess.Popen,
flush: bool = True,
threads: Type[Threads] = Threads,
) -> None:
"""Conveniently create the threads and execute process."""
with self.make_threads(
process,
flush=flush,
threads=threads,
) as _threads:
_threads.join()
self.flush()
self.reset_head()
def make_threads(
self,
process: subprocess.Popen,
flush: bool = True,
threads: Type[Threads] = Threads,
) -> Threads:
"""Conveniently create threads."""
return threads(process, self, flush=flush)
def flush(self) -> None:
"""Flush all sinks."""
for sinks in self:
if sinks is None:
continue
for sink in sinks:
if hasattr(sink, 'flush'):
sink.flush()
def reset_head(self) -> None:
"""Reset the head of all sinks."""
for sinks in self:
for sink in sinks or []:
if hasattr(sink, 'seek'):
try:
sink.seek(0)
except io.UnsupportedOperation:
pass
@staticmethod
def _to_callback(
sinks: Optional[List[Any]],
flush: bool = True,
) -> Optional[Tuple[TSink, ...]]:
"""Convert sinks to a callback."""
if sinks is None:
return None
callbacks: List[TSink] = []
for sink in sinks:
if isinstance(sink, queue.Queue):
callbacks.append(sink.put)
elif hasattr(sink, 'write'):
if flush and hasattr(sink, 'flush'):
callbacks.append(_flushed_write(sink))
else:
callbacks.append(sink.write)
else:
raise ValueError(f'Unknown sink type {type(sink)} for {sink}')
return tuple(callbacks)
def as_callbacks(
self,
flush: bool = True,
) -> Tuple[Optional[Tuple[TSink, ...]], ...]:
"""Convert all sinks to their callbacks."""
return tuple(self._to_callback(sinks, flush=flush) for sinks in self)
def popen_call(*args, stdout=None, stderr=None, **kwargs) -> subprocess.Popen:
"""Initialize process and wait for IO to complete."""
return Sinks(stdout, stderr).run(*args, **kwargs)
Given that big chuck of code, the usage is fairly simple. Take the following, which logs stdout, stderr and both of the outputs into three different files. Whilst outputting the output to stdout and stderr live.
if __name__ == '__main__':
import sys
import tempfile
with tempfile.TemporaryFile() as fout,\
tempfile.TemporaryFile() as ferr,\
tempfile.TemporaryFile() as fboth:
process = popen_call(
['python', 'test.py'],
stdout=(sys.stdout.buffer, fout, fboth),
stderr=(sys.stderr.buffer, ferr, fboth),
)
process.wait()
print('\n\noriginal')
print(fboth.read().decode('utf-8'))
print('\nstd.out')
print(fout.read().decode('utf-8'))
print('\nstd.err')
print(ferr.read().decode('utf-8'), end='')
test.py
import sys
import random
import time
random.seed(42401)
for _ in range(10):
f = random.choice([sys.stdout, sys.stderr])
word = random.sample('Hello World!', random.randrange(1, 12))
f.write(''.join(word) + '\n')
f.flush()
time.sleep(random.randrange(3))
Code tested on Python 3.7.2 only.
1 Answer 1
Reconsidering valid/feasible "sinks"
When reasoning about the crucial idea and recalling the origin Unix tee
command the obvious conclusion that comes out is that Sinks
item should actually be I/O stream (wheather binary, text or buffered stream or OS-level file object) or optionally, queue.Queue
instance. All other types are in-actual/invalid in such context.
Thus, it's reasonable to validate the input sequences (out
, err
) if they implementing the main sub-classes of basic io.IOBAse
interface or queue.Queue
class at the very start on constructing Sinks
instance.
That will allow to eliminate noisy repetitive checks like if hasattr(sink, 'flush')
, hasattr(sink, 'seek')
, hasattr(sink, 'write')
- assuming that "sinks" items are instances derived from any of (io.RawIOBase, io.BufferedIOBase, io.TextIOBase)
which already implement flush/seek/write
behavior.
With that in mind, I'd add the respective static methods to Sinks
class:
@staticmethod
def _validate_sinks(sinks: Sequence[Any]):
for sink in sinks:
if not isinstance(sink, (io.RawIOBase, io.BufferedIOBase, io.TextIOBase, queue.Queue)):
raise TypeError(f'Type `{type(sink)}` is not valid sink type')
@staticmethod
def is_iostream(sink):
return isinstance(sink, (io.RawIOBase, io.BufferedIOBase, io.TextIOBase))
Now, the Sinks.__new__
method would look as (also see how redundant set
reassignment optimized):
def __new__(cls, out: Sequence[Any], err: Sequence[Any]):
"""Create new sinks."""
# Validating I/O streams
if out:
Sinks._validate_sinks(out)
if err:
Sinks._validate_sinks(err)
both: Tuple[Any, ...] = ()
if out and err:
_out = set(out)
_err = set(err)
out = tuple(_out - _err)
err = tuple(_err - _out)
both = tuple(_out & _err)
return _Sinks.__new__(cls, out, err, both)
Before posting optimized flush
, reset_head
and _to_callback
methods - here are some subtle issues:
reset_head
method.
When running your approach "as is" I gotOSError: [Errno 29] Illegal seek
at the end.
Some raw binary stream may not be seeakable.If
False
, seek(), tell() and truncate() will raise OSError.Therefore, let's capture 2 exceptions there
except (io.UnsupportedOperation, OSError) as ex:
(see the restructured method version below)_to_callback
method.
The method is simplified due to preliminary initial validation.
Considering the above issues and some other minor but redundant moments/conditions like if sinks is None: continue
, for sink in sinks or []:
the mentioned 3 methods would look as below:
def flush(self) -> None:
"""Flush all sinks."""
for sinks in filter(Sinks.is_iostream, self):
for sink in sinks:
sink.flush()
def reset_head(self) -> None:
"""Reset the head of all sinks."""
for sinks in filter(Sinks.is_iostream, self):
for sink in sinks:
try:
sink.seek(0)
except (io.UnsupportedOperation, OSError) as ex:
print(sink, sink.seekable(), ex)
pass
@staticmethod
def _to_callback(
sinks: Optional[List[Any]],
flush: bool = True,
) -> Optional[Tuple[TSink, ...]]:
"""Convert sinks to a callback."""
if sinks is None:
return None
callbacks: List[TSink] = []
for sink in sinks:
if isinstance(sink, queue.Queue):
callbacks.append(sink.put)
elif Sinks.is_iostream(sink):
callbacks.append(_flushed_write(sink) if flush else sink.write)
return tuple(callbacks)
Sample running (output):
doWlloHer
HlWd
lHl o!lreo
!
oWd H
lHlWd
delWlHlor
olrdl!He
HolWlloe
!l
original
doWlloHer
HlWd
lHl o!lreo
!
oWd H
lHlWd
delWlHlor
olrdl!He
HolWlloe
!l
std.out
lHl o!lreo
!
oWd H
lHlWd
!l
std.err
doWlloHer
HlWd
delWlHlor
olrdl!He
HolWlloe
-
\$\begingroup\$ I'm a little confused by the sentance "Sinks item should actually be I/O stream". Do you mean a sink or the
Sinks
class? I'm assuming the former. Why do you think an atomic message queue shouldn't be valid input? Whilst this is tee inspired, I fail to see why removing the option to pass a message queue is a good design choice. Is it possible to interact with stdin from stdout or stderr without the use of an atomic queue? An example use case is passingy
topip uninstall ...
. \$\endgroup\$2019年12月16日 12:10:46 +00:00Commented Dec 16, 2019 at 12:10 -
\$\begingroup\$ Even though I'm not convinced on your stance, of only file objects. I do appreciate the review. It's highlighted some changes that I should make. Thank you. \$\endgroup\$2019年12月16日 12:13:10 +00:00Commented Dec 16, 2019 at 12:13
-
\$\begingroup\$ @Peilonrayz, 1) "Do you mean a sink or the Sinks class?" - I meant any item within any of
out
orerr
sequences. 2) "I fail to see why removing the option to pass a message queue is a good design choice" - why thenif isinstance(sink, queue.Queue):
is never reached in your approach? \$\endgroup\$RomanPerekhrest– RomanPerekhrest2019年12月16日 12:22:27 +00:00Commented Dec 16, 2019 at 12:22 -
\$\begingroup\$ 1) Good, I assumed correctly. 2) Why do you say it's never reached? That code is there so the code works correctly if you pass in a
queue.Queue
-stdout=(queue.Queue(),)
. Yes, my examples don't show this, but it is definitely reached. \$\endgroup\$2019年12月16日 12:35:56 +00:00Commented Dec 16, 2019 at 12:35 -
1\$\begingroup\$ I agree with you there, it would be better to validate to a common object. Again thank you for the review. And thank you for talking through your thought process. \$\endgroup\$2019年12月16日 13:59:16 +00:00Commented Dec 16, 2019 at 13:59
Explore related questions
See similar questions with these tags.
sys.stdout.buffered
givesAttributeError: '_io.TextIOWrapper' object has no attribute 'buffered'
. Unintentional typo in docstring? \$\endgroup\$sys.stdout.buffer
. All the code works except those - as I haven't added tests for my docstrings yet. \$\endgroup\$