0

Task: i have a class that takes some arguments and runs a function in a cycle, that clones a lot of git repositories. I'd like to run it in a parallel, but faced with errors such like

cannot pickle '_thread.RLock' object (when trying multiprocessing) cannot pickle 'ConsoleThreadLocals' object (when trying multiprocessing with dill)

Code structure:

# main.py
import ProcessProjects
if __name__ == '__main__':
 ProcessProjects(
 project_list=['project1', 'project2'], 
 project_dir='projects', 
 repo_type='all',
 clear_before_start=True)
# ProcessProjects.py
import git
import os
import re
import shutil
import params
class ProcessProjects:
 def __init__(self,
 project_dir: str,
 project_list: list[str] = ['all'],
 repo_type: str = 'all',
 clear_before_start: bool = False,):
 self.project_dir = project_dir
 self.project_list = project_list
 self.repo_type = repo_type
 self.clear_before_start = clear_before_start
 self.main()
 def _task(self, project_name):
 _data = params.projects[project_name]
 def _run(repo_type, repo_name, repo_branch, repo_url, repo_dir):
 repo = git.Repo.clone_from(repo_url, repo_dir)
 repo.git.checkout(repo_branch)
 def _get_inventory():
 _repo_url = _data['inventory']['url']
 _repo_name = re.findall(r'7999\/.+\/(.+)\.git', _repo_url)[0]
 _repo_dir = f"{self.project_dir}/{project_name}/inventory"
 _repo_branch = _data['inventory']['branch']
 if os.path.exists(_repo_dir) and os.path.isdir(_repo_dir):
 if self.clear_before_start:
 shutil.rmtree(_repo_dir)
 _run('inventory', _repo_name, _repo_branch, _repo_url, _repo_dir)
 else:
 _run('inventory', _repo_name, _repo_branch, _repo_url, _repo_dir)
 def _get_code():
 for _repo in _data['code']:
 _repo_url = _repo['url']
 _repo_name = re.findall(r'7999\/.+\/(.+)\.git', _repo_url)[0]
 _repo_dir = f"{self.project_dir}/{project_name}/code/{_repo_name}"
 _repo_branch = _repo['branch']
 if os.path.exists(_repo_dir) and os.path.isdir(_repo_dir):
 if self.clear_before_start:
 shutil.rmtree(_repo_dir)
 _run('code', _repo_name, _repo_branch, _repo_url, _repo_dir)
 else:
 _run('code', _repo_name, _repo_branch, _repo_url, _repo_dir)
 if self.project_list[0] == 'all' or project_name in self.project_list:
 if self.repo_type == 'all':
 _get_inventory()
 _get_code()
 elif self.repo_type == 'inventory':
 _get_inventory()
 elif self.repo_type == 'code':
 _get_code()
 def main(self):
 for project in params.projects.keys():
 self._task(project)
# params.py example
projects = {
 'project1': {
 'inventory': {'branch': 'master', 'url': 'ssh://.../inventory.git'},
 'code': [
 {'branch': 'develop', 'url': 'ssh://.../code.git'},
 ]
 },
 'project2': {
 'inventory': {'branch': 'master', 'url': 'ssh://.../inventory.git'},
 'code': [
 {'branch': 'master', 'url': 'ssh://.../code.git'},
 {'branch': 'master', 'url': 'ssh://.../code2.git'},
 ]
 },

This is a code example without multiprocessing. When I call class, I specify, what projects I want to download, where to store it, and what type of repository i want to get.

How can I rewrite main method, to replace cycle

for project in params.projects.keys():
 self._task(project)

and run task in a parallel?

for example

from multiprocessing import Pool
with multiprocessing.Pool() as pool:
 pool.map(self._task, [project for project in params.projects.keys()])

TypeError: cannot pickle '_thread.RLock' object

from multiprocessing import Process
workers = []
for project in params.projects.keys():
 worker = Process(target=self._task, args=[project])
 worker.start()
 workers.append(worker)
for worker in workers:
 worker.join()

TypeError: cannot pickle '_thread.RLock' object

asked Dec 10, 2022 at 10:58
3
  • It seems that that happens because _task method uses self.vars, but i don't understand how to deal with it Commented Dec 10, 2022 at 10:59
  • I question whether you posted the actual code you tried. For one thing, the statement with multiprocessing.Pool() as pool: will throw an exception since you have not imported multiprocessing; you imported Pool from that module so you would need just with Pool() as pool:. Also, in your comments you mention self.vars. I don't see where that is defined. You need to specify your platform (as a tag) and include actual source with multiprocessing along with a stack trace. Commented Dec 10, 2022 at 19:40
  • You are trying to launch Processes while still in the act of constructing an instance of ProcessProjects, since your constructor __init__ calls self.main. Processes need to pickle all the objects they work with, so this might cause a problem. I'm not sure. On general principles I think the main function belongs outside the constructor. Commented Dec 11, 2022 at 11:47

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.