Bulk recompiling edits to files by running external processes with a pool of 4 workers using asyncio
I am bulk recompiling edits to a c-like language which uses an external compiler.exe. The compiler takes the file paths as arguments and does the processing, nothing needs to be synchronized simply speed up the de/compilation of multiple files by using workers, when a worker finishes it should be recycled and the executable should be started again with the next set of arguments.
I was able to create some working code, but I believe there is a much more pythonic way to do this but I cant seem to find info on the right high level functions to Bulk start, await, and restart external processes with n workers while a generator is not exhausted.
I created a simple C# console app to simulate the compiler along with some fake files to be processed by the fake compiler. Can be downloaded here.
To run a test, create a folder to work out of, place the fake_files folder and the python script into your working folder, then run the script.
namespace ConsoleApp3
{
class Program
{
static void Main(string[] args)
{
String InFile = args[0];
String OutFile = args[1];
Console.WriteLine($"Compiling file: {InFile}");
Int32 ProcessingTime = new Random().Next(3, 25);
Thread.Sleep(ProcessingTime * 1000);
Console.WriteLine($"Compiling complete! Outfile: {OutFile}");
}
}
}
import asyncio
import functools
import logging
import string
import subprocess
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Generator
# create logger
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
fh = logging.FileHandler('debug.log', mode='w')
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('[%(asctime)s][%(name)s][%(levelname)s]: %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
log.addHandler(ch)
log.addHandler(fh)
SCRIPT_COMP_EXE = './fake_files/fake_compiler.exe'
async def signal_end_of_generator(complete_event: asyncio.Future):
"""This function waits for the tasks that are still compiling
and is awaiting on the subprocess.run inside the executor.
I did not see a built-in async way to do this, only sync with shutdown"""
for t in asyncio.all_tasks():
if t.get_coro().__name__ == 'recompile_script_async':
await t
complete_event.set_result(True)
async def recompile_script_async(file_generator: Generator,
loop: asyncio.AbstractEventLoop,
pool: ThreadPoolExecutor,
complete_event: asyncio.Future,
stop_iteration: asyncio.Event,
worker: int):
# Check file iterator
try:
fqp = next(file_generator)
except StopIteration:
if not stop_iteration.is_set():
stop_iteration.set()
loop.create_task(signal_end_of_generator(complete_event))
return
log.debug(f'Worker {worker}, has started.')
# Compile
outfile = Path(str(fqp).removesuffix('.c'))
result = await loop.run_in_executor(pool, functools.partial(
subprocess.run,
[SCRIPT_COMP_EXE, '-c', fqp, outfile],
capture_output=True, text=False
))
# Recycle Worker
log.debug(f'Worker {worker}, has completed.')
if not stop_iteration.is_set():
log.debug(f'Recycling Worker {worker}.')
loop.create_task(
recompile_script_async(file_generator, loop, pool, complete_event, stop_iteration, worker))
def recompile_scripts_parallel(root, max_workers=4):
log.info('Recompiling Scripts in parallel...')
loop = asyncio.new_event_loop()
with ThreadPoolExecutor(max_workers=max_workers) as pool:
stop_iteration = asyncio.Event()
complete_event = loop.create_future()
file_generator = Path(root).rglob('*.bin.c')
for worker in range(0, max_workers):
log.debug(f'Starting parallel worker: {worker}')
loop.create_task(
recompile_script_async(file_generator, loop, pool, complete_event, stop_iteration, worker))
loop.run_until_complete(complete_event)
if __name__ == '__main__':
recompile_scripts_parallel('./fake_files')
For example, the way that I use task recursion that starts a new task, and the way I safely await the processes once the generator is exhausted and the low level loop access just seems like a lot of low level code to achieve this.
1 Answer 1
Combining concurrent.futures
and subprocess
libraries, seems to be the shortest sync way to achieve the same results. This spawns n number of threads that call subprocess.run which waits on the subprocess to finish. The ThreadPoolExecutor
will automatically queue up any submited requests beyond its max_workers, only spawning as many as max_workers threads to work through the submited requests.
import logging
import subprocess
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
logging.basicConfig(format='[%(asctime)s][%(name)s][%(thread)s][%(levelname)s]: %(message)s', level=logging.DEBUG)
def launch_compiler(file):
outfile = Path(str(file).removesuffix('.c'))
logging.info(f"Compiling: {file}")
subprocess.run(['./fake_files/fake_compiler.exe', file, outfile], capture_output=True, text=False)
logging.info(f"finished: {file}")
def compile_with_workers(max_workers):
# When ThreadPoolExecutor is used as context-manager
# .shutdown is called on exit which blocks until all background threads complete
with ThreadPoolExecutor(max_workers=max_workers) as pool:
for fqp in Path('./fake_files').rglob('*.bin.c'):
pool.submit(launch_compiler, fqp)
# Execution will block until Executor completes all submitted requests
logging.info('All processes finished.')
if __name__ == '__main__':
compile_with_workers(8)
Explore related questions
See similar questions with these tags.
multiprocessing
instead ofasyncio
? \$\endgroup\$