I am trying to pipe multiple Linux commands and abort if there is an error. With Popen the communicate() method waits for all commands to finish. This is why I am trying asyncio now.
I have the following MWE working as expected if there is no error:
async def forward(src, dest):
"""Read data from src and write it to dest."""
while True:
chunk = await src.read(4096)
if not chunk:
dest.write_eof()
await dest.drain()
break
dest.write(chunk)
await dest.drain()
async def stderr_watch(stream):
err = await stream.read()
if err.strip():
raise RuntimeError(f"stderr: {err.decode()}")
async def main():
p1 = await asyncio.create_subprocess_exec(
"find","/", "-name", "*.py",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
p2 = await asyncio.create_subprocess_exec(
"wc", "-l",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
output = []
async def stream_reader(stream):
while True:
line = await stream.readline()
if not line:
break
output.append(line.decode())
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(stderr_watch(p1.stderr))
t2 = tg.create_task(stderr_watch(p2.stderr))
t3 = tg.create_task(forward(p1.stdout, p2.stdin))
t4 = tg.create_task(stream_reader(p2.stdout))
except* Exception as eg:
for e in eg.exceptions:
print(e)
pass
else:
return "".join(output)
if __name__ == '__main__':
output = asyncio.run(main())
print(output)
However, I am getting a RuntimeError: Event loop is closed if I create an exeption in p2, eg. by piping to "wc", "-l", "abc",. Where is my mistake?
1 Answer 1
The error occurs because the finalizer of the internal object runs after the event loop is finished. (It's the asyncio.base_subprocess.BaseSubprocessTransport.)
You need to terminate the child processes on error, and await on them, like this.
async def main():
p1 = await asyncio.create_subprocess_exec(
...
)
p2 = await asyncio.create_subprocess_exec(
...
)
...
try:
async with asyncio.TaskGroup() as tg:
...
except* Exception as eg:
...
for p in (p1, p2):
try:
p.terminate()
except Exception:
pass
for p in (p1, p2):
await p.communicate()