1
\$\begingroup\$

I am bulk recompiling edits to a c-like language which uses an external compiler.exe. The compiler takes the file paths as arguments and does the processing, nothing needs to be synchronized simply speed up the de/compilation of multiple files by using workers, when a worker finishes it should be recycled and the executable should be started again with the next set of arguments.

I was able to create some working code, but I believe there is a much more pythonic way to do this but I cant seem to find info on the right high level functions to Bulk start, await, and restart external processes with n workers while a generator is not exhausted.

I created a simple C# console app to simulate the compiler along with some fake files to be processed by the fake compiler. Can be downloaded here.

To run a test, create a folder to work out of, place the fake_files folder and the python script into your working folder, then run the script.

namespace ConsoleApp3
{
 class Program
 {
 static void Main(string[] args)
 {
 String InFile = args[0];
 String OutFile = args[1];
 Console.WriteLine($"Compiling file: {InFile}");
 Int32 ProcessingTime = new Random().Next(3, 25);
 Thread.Sleep(ProcessingTime * 1000);
 Console.WriteLine($"Compiling complete! Outfile: {OutFile}");
 }
 }
}
import asyncio
import functools
import logging
import string
import subprocess
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Generator
# create logger
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
fh = logging.FileHandler('debug.log', mode='w')
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('[%(asctime)s][%(name)s][%(levelname)s]: %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
log.addHandler(ch)
log.addHandler(fh)
SCRIPT_COMP_EXE = './fake_files/fake_compiler.exe'
async def signal_end_of_generator(complete_event: asyncio.Future):
 """This function waits for the tasks that are still compiling
 and is awaiting on the subprocess.run inside the executor.
 I did not see a built-in async way to do this, only sync with shutdown"""
 for t in asyncio.all_tasks():
 if t.get_coro().__name__ == 'recompile_script_async':
 await t
 complete_event.set_result(True)
async def recompile_script_async(file_generator: Generator,
 loop: asyncio.AbstractEventLoop,
 pool: ThreadPoolExecutor,
 complete_event: asyncio.Future,
 stop_iteration: asyncio.Event,
 worker: int):
 # Check file iterator
 try:
 fqp = next(file_generator)
 except StopIteration:
 if not stop_iteration.is_set():
 stop_iteration.set()
 loop.create_task(signal_end_of_generator(complete_event))
 return
 log.debug(f'Worker {worker}, has started.')
 # Compile
 outfile = Path(str(fqp).removesuffix('.c'))
 result = await loop.run_in_executor(pool, functools.partial(
 subprocess.run,
 [SCRIPT_COMP_EXE, '-c', fqp, outfile],
 capture_output=True, text=False
 ))
 # Recycle Worker
 log.debug(f'Worker {worker}, has completed.')
 if not stop_iteration.is_set():
 log.debug(f'Recycling Worker {worker}.')
 loop.create_task(
 recompile_script_async(file_generator, loop, pool, complete_event, stop_iteration, worker))
def recompile_scripts_parallel(root, max_workers=4):
 log.info('Recompiling Scripts in parallel...')
 loop = asyncio.new_event_loop()
 with ThreadPoolExecutor(max_workers=max_workers) as pool:
 stop_iteration = asyncio.Event()
 complete_event = loop.create_future()
 file_generator = Path(root).rglob('*.bin.c')
 for worker in range(0, max_workers):
 log.debug(f'Starting parallel worker: {worker}')
 loop.create_task(
 recompile_script_async(file_generator, loop, pool, complete_event, stop_iteration, worker))
 loop.run_until_complete(complete_event)
if __name__ == '__main__':
 recompile_scripts_parallel('./fake_files')

For example, the way that I use task recursion that starts a new task, and the way I safely await the processes once the generator is exhausted and the low level loop access just seems like a lot of low level code to achieve this.

asked Oct 15, 2022 at 5:27
\$\endgroup\$
3
  • \$\begingroup\$ Have you considered multiprocessing instead of asyncio? \$\endgroup\$ Commented Oct 15, 2022 at 6:47
  • \$\begingroup\$ The main reason I avoided ProcessPoolExecutor and multiprocessing was because it forked the entire python script and starts a new python interpreter. I am needing to await on the compiling process of external tools then continue one instance of the python script. \$\endgroup\$ Commented Oct 15, 2022 at 6:55
  • \$\begingroup\$ The other reason is most of the stuff in multiprocessing seems to be about message queues and synchronization of the multiple processes. I don't need the processes synchronized or to share any data, just need another instance to start up with the next set of data whenever one finishes. Using a set number of worker tasks that use recursion to start the next task was the best I could come up with. Ideally though there would be some high level api that takes a generator and a function, and keeps n number of workers alive while the generator gets processed. \$\endgroup\$ Commented Oct 15, 2022 at 7:20

1 Answer 1

1
\$\begingroup\$

Combining concurrent.futures and subprocess libraries, seems to be the shortest sync way to achieve the same results. This spawns n number of threads that call subprocess.run which waits on the subprocess to finish. The ThreadPoolExecutor will automatically queue up any submited requests beyond its max_workers, only spawning as many as max_workers threads to work through the submited requests.

import logging
import subprocess
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
logging.basicConfig(format='[%(asctime)s][%(name)s][%(thread)s][%(levelname)s]: %(message)s', level=logging.DEBUG)
def launch_compiler(file):
 outfile = Path(str(file).removesuffix('.c'))
 logging.info(f"Compiling: {file}")
 subprocess.run(['./fake_files/fake_compiler.exe', file, outfile], capture_output=True, text=False)
 logging.info(f"finished: {file}")
def compile_with_workers(max_workers):
 # When ThreadPoolExecutor is used as context-manager
 # .shutdown is called on exit which blocks until all background threads complete
 with ThreadPoolExecutor(max_workers=max_workers) as pool:
 for fqp in Path('./fake_files').rglob('*.bin.c'):
 pool.submit(launch_compiler, fqp)
 # Execution will block until Executor completes all submitted requests
 logging.info('All processes finished.')
if __name__ == '__main__':
 compile_with_workers(8)
answered Oct 15, 2022 at 10:05
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.