1

I am having difficulties understanding logic of optimizing multip-rocessing in Python 3.11.

Context:

I am running on Ubuntu 22.04, x86 12 cores / 32 threads, 128 GB memory

Concerns:

(Please refer to code and result below).
Both multiprocessing function using a local df (using map+partial or starmap) take a lot more time than multiprocessing using the global df.

It seemed ok to me but ...

... by activating the sleep(10) in both fun, I noticed that every spawned process takes around 1.2 GB of memory (using system monitor in Ubuntu), so I guess the df must have been duplicated even when declared global.

My question (eventually :-))

  • Why multi-processings using local df take so much time compared to the one using a global df if memory is copied in each process (whether it is local or global) ?
  • Or maybe system monitor in Ubuntu report the memory accessible to a process and not necessarily its intrinsic footprint ? (shared memory would then appear duplicated in several processes and the sum of all these processes could be greater than my system memory) ?

Thank you for your enlightenment

Code

from time import sleep
from multiprocessing import Pool
from functools import partial
from time import time
import numpy as np
import pandas as pd
def _fun_local(df, imax: int):
 # sleep(10)
 df_temp = df[:imax]
 return df_temp
def _fun_global(imax: int):
 global mydf
 # sleep(10)
 df_temp = mydf[:imax]
 return df_temp
def dummy():
 # Create a 1 GB df
 global mydf
 n_rows = 13_421_773
 n_cols = 10
 data = np.random.rand(n_rows, n_cols)
 mydf = pd.DataFrame(data, columns=[f'col_{i}' for i in range(n_cols)])
 # check memory footprint
 print('mydf', mydf.memory_usage(deep=True).sum() / 1024 / 1024)
 imaxs = range(1, 33)
 # With local df, call function with partial
 start = time()
 with Pool() as pool:
 fun = partial(_fun_local, mydf)
 results_partial = pool.map(fun, imaxs)
 print(f'local-partial took: {time()-start}')
 # With local df, call function with starmap
 start = time()
 with Pool() as pool:
 results_starmap = pool.starmap(_fun_local, [(mydf, imax) for imax in imaxs])
 print(f'local-starmap took: {time()-start}')
 # With global mydf
 start = time()
 with Pool() as pool:
 results_global = pool.map(_fun_global, imaxs)
 print(f'global took: {time()-start}')
 return results_local, results_global
if __name__ == '__main__':
 results = dummy()

Result:

mydf (MB): 1024.0001411437988
local-partial took: 89.05407881736755
local-starmap took: 88.06274890899658
global took: 0.09803605079650879
Ahmed AEK
23.2k3 gold badges19 silver badges50 bronze badges
asked Jan 28, 2025 at 15:51
2
  • Note, that in _fun_global the global statement is pointless Commented Jan 28, 2025 at 15:57
  • However, when you pass any objedt as an argument using pool.map that gets copied an additional time! So in this case, an additional imaxs times Commented Jan 28, 2025 at 16:00

1 Answer 1

1

When you fork a child process (in this case you are forking multiprocessing.count() processes) it is true that the child process inherits the memory of the forking process. But copy-on-write semantics is used such that when that inherited memory is modified, it is first copied resulting in increased memory utilization. Even though the child process is not explicitly updating the global dataframe, once it is referenced it gets copied because Python is using reference counts for memory management and the reference count for the dataframe will get incremented. Now consider the following code.

from multiprocessing import Pool
import os
import time
SLEEP = False
some_data = [0, 1, 2, 3, 4, 5, 6, 7]
def worker(i):
 if SLEEP:
 time.sleep(.5)
 return some_data[i], os.getpid()
def main():
 with Pool(8) as pool:
 results = pool.map(worker, range(8))
 print('results =', results)
if __name__ == '__main__':
 main()

Prints:

results = [(0, 35988), (1, 35988), (2, 35988), (3, 35988), (4, 35988), (5, 35988), (6, 35988), (7, 35988)]

We see that it a single pool process (PID=35988) processed all 8 submitted tasks. This is because the task was incredibly short running resulting in a single pool process being able to pull from the pool's task queue all 8 tasks before the other pool processes finished starting and attempted to process tasks. This also means that global some_data was only referenced by a single pool process and therefore only copied once.

If now we change SLEEP to True, the output is now:

results = [(0, 45324), (1, 4828), (2, 19760), (3, 8420), (4, 41944), (5, 58220), (6, 46340), (7, 21628)]

Each of the 8 pool processes processed one task and therefore some_data was copied 8 times.

What Does This Mean?

Your _fun_global worker function is also short running if it is not sleeping and probably only a single pool process is processing all submitted tasks resulting in the dataframe being copied only once. But if you do sleep, then each pool process will get to process a task and the dataframe will be copied N times where N is the size of the pool (os.cpu_count()).

But even when the inherited memory must be copied, this is a relatively fast operation compared to the situation where you are passing a local dataframe as an argument, in which case the copying is done by using pickle to first serialize the dataframe in the main process and pickle again to de-serialize the dataframe in the child process.

Summary

  1. Using a local dataframe is is slower because copying a dataframe using pickle is slower than just doing a byte-for-byte copy.
  2. Using a global dataframe will result in lower memory utilization if your worker function is short-running so that a single pool process handles all submitted tasks resulting in the copy-on-write occurring only once.

Use Shared Memory To Reduce Memory Utilization

You can reduce memory by sharing a single copy of the dataframe among the processes:

  1. Construct your data numpy array.
  2. Copy a flattened version of the data array into a shared memory array shared_array.
  3. Construct a dataframe mydf based on the shared array.
  4. The processing pool is created specifying an initializer function that is executed once for each pool process. The initializer is passed the shared array from which it can reconstruct the sharable datframe.
from time import sleep
import multiprocessing
import numpy as np
import pandas as pd
def np_array_to_shared_array(np_array, lock=False):
 """Construct a sharable array from a numpy array.
 Specify lock=True if multiple processes might be updating the same
 array element."""
 shared_array = multiprocessing.Array('B', np_array.nbytes, lock=lock)
 buffer = shared_array.get_obj() if lock else shared_array
 arr = np.frombuffer(buffer, np_array.dtype)
 arr[:] = np_array.flatten(order='C')
 return shared_array
def shared_array_to_np_array(shared_array, shape, dtype):
 """Reconstruct a numpy array from a shared array."""
 buffer = (
 shared_array.get_obj() if getattr(shared_array, 'get_obj', None)
 else shared_array
 )
 return np.ndarray(shape, dtype=dtype, buffer=buffer)
def df_from_shared_array(shared_array: multiprocessing.Array, shape: tuple[int], dtype: str):
 """Construct the dataframe based on a sharable array."""
 np_array = shared_array_to_np_array(shared_array, shape, dtype)
 return pd.DataFrame(np_array, columns=[f'col_{i}' for i in range(shape[1])])
def init_pool(shared_array: multiprocessing.Array, shape: tuple[int], dtype: str):
 """This is executed for each pool process and for each creates a global variable
 mydf."""
 
 global mydf
 mydf = df_from_shared_array(shared_array, shape, dtype)
def _fun(imax: int):
 sleep(1)
 return mydf[:imax]
def make_data():
 n_rows = 13_421_773
 n_cols = 10
 data = np.random.rand(n_rows, n_cols)
 shared_array = np_array_to_shared_array(data)
 mydf = df_from_shared_array(shared_array, data.shape, data.dtype)
 shape, dtype = data.shape, data.dtype
 del data # We no longer need this
 return shared_array, shape, dtype, mydf
def dummy():
 shared_array, shape, dtype, mydf = make_data()
 imaxs = range(1, shape[1] + 1)
 with multiprocessing.Pool(
 shape[1],
 initializer=init_pool,
 initargs=(shared_array, shape, dtype)
 ) as pool:
 results = pool.map(_fun, imaxs)
 for result in results:
 print(result, end='\n\n')
if __name__ == '__main__':
 dummy()
answered Jan 29, 2025 at 12:45
Sign up to request clarification or add additional context in comments.

3 Comments

Thank you for the in-depth explanation. Just to make sure: I think you misspelled "copy-on-write" ? Given byte-for-byte copy is so much faster sans pickling/unpickling, why is it not used when using a local dataframe ? I guess it cannot be used, any idea why ?
When a child process is forked and an object reference causes a copy-on-write, it's virtual address does not change, only the underlying physical memory and only for pages that have been modified (so the entire object is not necessarily copied). But when you make an explicit copy of any object for passing to a process, by definition the copy has to have a new virtual address and what is needed when passing an object to another address space is some sort of "deep copy" which cannot be accomplished by a byte for byte copy. (more...)
Finally, pickle allows you to control how the serialization/de-serialization is performed for any specific class. So let's say an object has a reference to some other object. That reference might be meaningless in another address space, for example a reference to an open stream of some sort. Serialization might require that the stream reference not be serialized and instead on de-serialization the stream will get re-created. A class can add methods __getstate__ and __setstate__ that override the default serialization and de-serialization logic.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.