0

I am trying to pipe multiple Linux commands and abort if there is an error. With Popen the communicate() method waits for all commands to finish. This is why I am trying asyncio now.

I have the following MWE working as expected if there is no error:

async def forward(src, dest):
 """Read data from src and write it to dest."""
 while True:
 chunk = await src.read(4096)
 if not chunk:
 dest.write_eof()
 await dest.drain()
 break
 dest.write(chunk)
 await dest.drain()
async def stderr_watch(stream):
 err = await stream.read()
 if err.strip():
 raise RuntimeError(f"stderr: {err.decode()}")
async def main():
 p1 = await asyncio.create_subprocess_exec(
 "find","/", "-name", "*.py",
 stdout=asyncio.subprocess.PIPE,
 stderr=asyncio.subprocess.PIPE,
 )
 p2 = await asyncio.create_subprocess_exec(
 "wc", "-l",
 stdin=asyncio.subprocess.PIPE,
 stdout=asyncio.subprocess.PIPE,
 stderr=asyncio.subprocess.PIPE,
 )
 output = []
 async def stream_reader(stream):
 while True:
 line = await stream.readline()
 if not line:
 break
 output.append(line.decode())
 try:
 async with asyncio.TaskGroup() as tg:
 t1 = tg.create_task(stderr_watch(p1.stderr))
 t2 = tg.create_task(stderr_watch(p2.stderr))
 t3 = tg.create_task(forward(p1.stdout, p2.stdin))
 t4 = tg.create_task(stream_reader(p2.stdout))
 except* Exception as eg:
 for e in eg.exceptions:
 print(e)
 pass
 else:
 return "".join(output)
if __name__ == '__main__':
 output = asyncio.run(main())
 print(output)

However, I am getting a RuntimeError: Event loop is closed if I create an exeption in p2, eg. by piping to "wc", "-l", "abc",. Where is my mistake?

asked yesterday

1 Answer 1

0

The error occurs because the finalizer of the internal object runs after the event loop is finished. (It's the asyncio.base_subprocess.BaseSubprocessTransport.) You need to terminate the child processes on error, and await on them, like this.

async def main():
 p1 = await asyncio.create_subprocess_exec(
 ...
 )
 p2 = await asyncio.create_subprocess_exec(
 ...
 )
 ...
 try:
 async with asyncio.TaskGroup() as tg:
 ...
 except* Exception as eg:
 ...
 for p in (p1, p2):
 try:
 p.terminate()
 except Exception:
 pass
 for p in (p1, p2):
 await p.communicate()
answered 12 hours ago
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.