Task: i have a class that takes some arguments and runs a function in a cycle, that clones a lot of git repositories. I'd like to run it in a parallel, but faced with errors such like
cannot pickle '_thread.RLock' object (when trying multiprocessing)
cannot pickle 'ConsoleThreadLocals' object (when trying multiprocessing with dill)
Code structure:
# main.py
import ProcessProjects
if __name__ == '__main__':
ProcessProjects(
project_list=['project1', 'project2'],
project_dir='projects',
repo_type='all',
clear_before_start=True)
# ProcessProjects.py
import git
import os
import re
import shutil
import params
class ProcessProjects:
def __init__(self,
project_dir: str,
project_list: list[str] = ['all'],
repo_type: str = 'all',
clear_before_start: bool = False,):
self.project_dir = project_dir
self.project_list = project_list
self.repo_type = repo_type
self.clear_before_start = clear_before_start
self.main()
def _task(self, project_name):
_data = params.projects[project_name]
def _run(repo_type, repo_name, repo_branch, repo_url, repo_dir):
repo = git.Repo.clone_from(repo_url, repo_dir)
repo.git.checkout(repo_branch)
def _get_inventory():
_repo_url = _data['inventory']['url']
_repo_name = re.findall(r'7999\/.+\/(.+)\.git', _repo_url)[0]
_repo_dir = f"{self.project_dir}/{project_name}/inventory"
_repo_branch = _data['inventory']['branch']
if os.path.exists(_repo_dir) and os.path.isdir(_repo_dir):
if self.clear_before_start:
shutil.rmtree(_repo_dir)
_run('inventory', _repo_name, _repo_branch, _repo_url, _repo_dir)
else:
_run('inventory', _repo_name, _repo_branch, _repo_url, _repo_dir)
def _get_code():
for _repo in _data['code']:
_repo_url = _repo['url']
_repo_name = re.findall(r'7999\/.+\/(.+)\.git', _repo_url)[0]
_repo_dir = f"{self.project_dir}/{project_name}/code/{_repo_name}"
_repo_branch = _repo['branch']
if os.path.exists(_repo_dir) and os.path.isdir(_repo_dir):
if self.clear_before_start:
shutil.rmtree(_repo_dir)
_run('code', _repo_name, _repo_branch, _repo_url, _repo_dir)
else:
_run('code', _repo_name, _repo_branch, _repo_url, _repo_dir)
if self.project_list[0] == 'all' or project_name in self.project_list:
if self.repo_type == 'all':
_get_inventory()
_get_code()
elif self.repo_type == 'inventory':
_get_inventory()
elif self.repo_type == 'code':
_get_code()
def main(self):
for project in params.projects.keys():
self._task(project)
# params.py example
projects = {
'project1': {
'inventory': {'branch': 'master', 'url': 'ssh://.../inventory.git'},
'code': [
{'branch': 'develop', 'url': 'ssh://.../code.git'},
]
},
'project2': {
'inventory': {'branch': 'master', 'url': 'ssh://.../inventory.git'},
'code': [
{'branch': 'master', 'url': 'ssh://.../code.git'},
{'branch': 'master', 'url': 'ssh://.../code2.git'},
]
},
This is a code example without multiprocessing. When I call class, I specify, what projects I want to download, where to store it, and what type of repository i want to get.
How can I rewrite main method, to replace cycle
for project in params.projects.keys():
self._task(project)
and run task in a parallel?
for example
from multiprocessing import Pool
with multiprocessing.Pool() as pool:
pool.map(self._task, [project for project in params.projects.keys()])
TypeError: cannot pickle '_thread.RLock' object
from multiprocessing import Process
workers = []
for project in params.projects.keys():
worker = Process(target=self._task, args=[project])
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
TypeError: cannot pickle '_thread.RLock' object
with multiprocessing.Pool() as pool:will throw an exception since you have not importedmultiprocessing; you importedPoolfrom that module so you would need justwith Pool() as pool:. Also, in your comments you mentionself.vars. I don't see where that is defined. You need to specify your platform (as a tag) and include actual source with multiprocessing along with a stack trace.__init__callsself.main. Processes need to pickle all the objects they work with, so this might cause a problem. I'm not sure. On general principles I think the main function belongs outside the constructor.