The worker processes spawned by concurrent.futures.ProcessPoolExecutor, docs, tend to have the same priority as their main/parent process - to be expected. I was looking for "clean" options to start them with lower (lowest) possible priority for certain jobs without interfering with the main/parent process.
So far, I am using psutil, see here, and simply let an initializer function call change the process priority of the newly spawned worker processes "from within":
import os
from concurrent.futures import ProcessPoolExecutor
from time import sleep
import psutil
def set_priority(value: int):
p = psutil.Process(os.getpid())
p.nice(value)
def task(item: int) -> int:
sleep(2)
return item ** 2
with ProcessPoolExecutor(
max_workers = 2,
initializer = set_priority,
initargs = (19,),
) as executor:
jobs = [executor.submit(task, item) for item in range(10)]
data = [job.result() for job in jobs]
print(data)
Following up on the above, I considered deriving a ProcessPoolExecutorNice class from ProcessPoolExecutor with all magic hidden in it, generalizing this to a point where custom initializer functions can still be passed by a user.
A draft Unix-only solution looks as follows. wrapper uses name-space magic and is not perfectly serializable, a requirement for making this work on Windows as far as I understand the subject:
import os
from concurrent.futures import ProcessPoolExecutor
import psutil
class ProcessPoolExecutorNice(ProcessPoolExecutor):
def __init__(self, *args, nice: int = 0, **kwargs):
assert len(args) <= 4 # HACK not future-proof
initargs = args.pop(3) if len(args) == 4 else kwargs.pop('initargs', tuple())
initializer = args.pop(2) if len(args) == 3 else kwargs.pop('initializer', None)
def wrapper():
self._set_priority(nice)
if initializer is not None:
initializer(*initargs)
super().__init__(*args, initializer = wrapper, **kwargs)
@staticmethod
def _set_priority(value: int):
p = psutil.Process(os.getpid())
p.nice(value)
It can be tested as follows:
from time import sleep
A = [-1]
def prepare():
A[0] = os.getpid()
def task(item: int) -> int:
sleep(2)
return item + A[0]
with ProcessPoolExecutorNice(
max_workers = 2,
initializer = prepare,
nice = 19,
) as executor:
jobs = [executor.submit(task, item) for item in range(10)]
data = [job.result() for job in jobs]
print(data)
Is there a better, more elegant way of doing this, perhaps on the level of the ProcessPoolExecutor object? I looked through the docs - not sure if I am overlooking something.
My messing around with initargs and initializer arguments in ProcessPoolExecutorNice's constructor can for sure be improved significantly - another recent question of mine was aiming at that.
More to the point, wrapper is a bottle-neck when porting this thing. Ideas on how to approach something like this?