Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 44551d3

Browse files
Merge pull request #3642 from mauriliogenovese/enh/cuda_support
support for gpu queue
2 parents 0c67bb3 + 59862c8 commit 44551d3

File tree

5 files changed

+87
-7
lines changed

5 files changed

+87
-7
lines changed

‎nipype/info.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ def get_nipype_gitversion():
149149
"acres",
150150
"etelemetry>=0.3.1",
151151
"looseversion!=1.2",
152+
"gputil>=1.4.0",
152153
"puremagic",
153154
]
154155

‎nipype/pipeline/engine/nodes.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,11 @@ def update(self, **opts):
820820
"""Update inputs"""
821821
self.inputs.update(**opts)
822822

823+
def is_gpu_node(self):
824+
return bool(getattr(self.inputs, 'use_cuda', False)) or bool(
825+
getattr(self.inputs, 'use_gpu', False)
826+
)
827+
823828

824829
class JoinNode(Node):
825830
"""Wraps interface objects that join inputs into a list.

‎nipype/pipeline/plugins/multiproc.py

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from ...utils.profiler import get_system_total_memory_gb
2222
from ..engine import MapNode
2323
from .base import DistributedPluginBase
24+
from .tools import gpu_count
2425

2526
try:
2627
from textwrap import indent
@@ -100,6 +101,7 @@ class MultiProcPlugin(DistributedPluginBase):
100101
101102
- non_daemon: boolean flag to execute as non-daemon processes
102103
- n_procs: maximum number of threads to be executed in parallel
104+
- n_gpu_procs: maximum number of GPU threads to be executed in parallel
103105
- memory_gb: maximum memory (in GB) that can be used at once.
104106
- raise_insufficient: raise error if the requested resources for
105107
a node over the maximum `n_procs` and/or `memory_gb`
@@ -130,10 +132,24 @@ def __init__(self, plugin_args=None):
130132
)
131133
self.raise_insufficient = self.plugin_args.get("raise_insufficient", True)
132134

135+
# GPU found on system
136+
self.n_gpus_visible = gpu_count()
137+
# proc per GPU set by user
138+
self.n_gpu_procs = self.plugin_args.get('n_gpu_procs', self.n_gpus_visible)
139+
140+
# total no. of processes allowed on all gpus
141+
if self.n_gpu_procs > self.n_gpus_visible:
142+
logger.info(
143+
'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!',
144+
self.n_gpu_procs,
145+
self.n_gpus_visible,
146+
)
147+
133148
# Instantiate different thread pools for non-daemon processes
134149
logger.debug(
135-
"[MultiProc] Starting (n_procs=%d, mem_gb=%0.2f, cwd=%s)",
150+
"[MultiProc] Starting (n_procs=%d, n_gpu_procs=%d, mem_gb=%0.2f, cwd=%s)",
136151
self.processors,
152+
self.n_gpu_procs,
137153
self.memory_gb,
138154
self._cwd,
139155
)
@@ -184,9 +200,12 @@ def _prerun_check(self, graph):
184200
"""Check if any node exceeds the available resources"""
185201
tasks_mem_gb = []
186202
tasks_num_th = []
203+
tasks_gpu_th = []
187204
for node in graph.nodes():
188205
tasks_mem_gb.append(node.mem_gb)
189206
tasks_num_th.append(node.n_procs)
207+
if node.is_gpu_node():
208+
tasks_gpu_th.append(node.n_procs)
190209

191210
if np.any(np.array(tasks_mem_gb) > self.memory_gb):
192211
logger.warning(
@@ -203,6 +222,10 @@ def _prerun_check(self, graph):
203222
)
204223
if self.raise_insufficient:
205224
raise RuntimeError("Insufficient resources available for job")
225+
if np.any(np.array(tasks_gpu_th) > self.n_gpu_procs):
226+
logger.warning('Nodes demand more GPU than allowed (%d).', self.n_gpu_procs)
227+
if self.raise_insufficient:
228+
raise RuntimeError('Insufficient GPU resources available for job')
206229

207230
def _postrun_check(self):
208231
self.pool.shutdown()
@@ -213,11 +236,14 @@ def _check_resources(self, running_tasks):
213236
"""
214237
free_memory_gb = self.memory_gb
215238
free_processors = self.processors
239+
free_gpu_slots = self.n_gpu_procs
216240
for _, jobid in running_tasks:
217241
free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb)
218242
free_processors -= min(self.procs[jobid].n_procs, free_processors)
243+
if self.procs[jobid].is_gpu_node():
244+
free_gpu_slots -= min(self.procs[jobid].n_procs, free_gpu_slots)
219245

220-
return free_memory_gb, free_processors
246+
return free_memory_gb, free_processors, free_gpu_slots
221247

222248
def _send_procs_to_workers(self, updatehash=False, graph=None):
223249
"""
@@ -232,7 +258,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
232258
)
233259

234260
# Check available resources by summing all threads and memory used
235-
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
261+
free_memory_gb, free_processors, free_gpu_slots = self._check_resources(
262+
self.pending_tasks
263+
)
236264

237265
stats = (
238266
len(self.pending_tasks),
@@ -241,6 +269,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
241269
self.memory_gb,
242270
free_processors,
243271
self.processors,
272+
free_gpu_slots,
273+
self.n_gpu_procs,
244274
)
245275
if self._stats != stats:
246276
tasks_list_msg = ""
@@ -256,13 +286,15 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
256286
tasks_list_msg = indent(tasks_list_msg, " " * 21)
257287
logger.info(
258288
"[MultiProc] Running %d tasks, and %d jobs ready. Free "
259-
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s",
289+
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d, Free GPU slot:%d/%d.%s",
260290
len(self.pending_tasks),
261291
len(jobids),
262292
free_memory_gb,
263293
self.memory_gb,
264294
free_processors,
265295
self.processors,
296+
free_gpu_slots,
297+
self.n_gpu_procs,
266298
tasks_list_msg,
267299
)
268300
self._stats = stats
@@ -304,28 +336,39 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
304336
# Check requirements of this job
305337
next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb)
306338
next_job_th = min(self.procs[jobid].n_procs, self.processors)
339+
next_job_gpu_th = min(self.procs[jobid].n_procs, self.n_gpu_procs)
340+
341+
is_gpu_node = self.procs[jobid].is_gpu_node()
307342

308343
# If node does not fit, skip at this moment
309-
if next_job_th > free_processors or next_job_gb > free_memory_gb:
344+
if (
345+
next_job_th > free_processors
346+
or next_job_gb > free_memory_gb
347+
or (is_gpu_node and next_job_gpu_th > free_gpu_slots)
348+
):
310349
logger.debug(
311-
"Cannot allocate job %d (%0.2fGB, %d threads).",
350+
"Cannot allocate job %d (%0.2fGB, %d threads, %d GPU slots).",
312351
jobid,
313352
next_job_gb,
314353
next_job_th,
354+
next_job_gpu_th,
315355
)
316356
continue
317357

318358
free_memory_gb -= next_job_gb
319359
free_processors -= next_job_th
360+
if is_gpu_node:
361+
free_gpu_slots -= next_job_gpu_th
320362
logger.debug(
321363
"Allocating %s ID=%d (%0.2fGB, %d threads). Free: "
322-
"%0.2fGB, %d threads.",
364+
"%0.2fGB, %d threads, %d GPU slots.",
323365
self.procs[jobid].fullname,
324366
jobid,
325367
next_job_gb,
326368
next_job_th,
327369
free_memory_gb,
328370
free_processors,
371+
free_gpu_slots,
329372
)
330373

331374
# change job status in appropriate queues
@@ -355,6 +398,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
355398
self._remove_node_dirs()
356399
free_memory_gb += next_job_gb
357400
free_processors += next_job_th
401+
if is_gpu_node:
402+
free_gpu_slots += next_job_gpu_th
358403
# Display stats next loop
359404
self._stats = None
360405

‎nipype/pipeline/plugins/tests/test_multiproc.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def test_run_multiproc(tmpdir):
5656
class InputSpecSingleNode(nib.TraitedSpec):
5757
input1 = nib.traits.Int(desc="a random int")
5858
input2 = nib.traits.Int(desc="a random int")
59+
use_gpu = nib.traits.Bool(False, mandatory=False, desc="boolean for GPU nodes")
5960

6061

6162
class OutputSpecSingleNode(nib.TraitedSpec):
@@ -117,6 +118,24 @@ def test_no_more_threads_than_specified(tmpdir):
117118
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads})
118119

119120

121+
def test_no_more_gpu_threads_than_specified(tmpdir):
122+
tmpdir.chdir()
123+
124+
pipe = pe.Workflow(name="pipe")
125+
n1 = pe.Node(SingleNodeTestInterface(), name="n1", n_procs=2)
126+
n1.inputs.use_gpu = True
127+
n1.inputs.input1 = 4
128+
pipe.add_nodes([n1])
129+
130+
max_threads = 2
131+
max_gpu = 1
132+
with pytest.raises(RuntimeError):
133+
pipe.run(
134+
plugin="MultiProc",
135+
plugin_args={"n_procs": max_threads, 'n_gpu_procs': max_gpu},
136+
)
137+
138+
120139
@pytest.mark.skipif(
121140
sys.version_info >= (3, 8), reason="multiprocessing issues in Python 3.8"
122141
)

‎nipype/pipeline/plugins/tools.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,3 +175,13 @@ def create_pyscript(node, updatehash=False, store_exception=True):
175175
with open(pyscript, "w") as fp:
176176
fp.writelines(cmdstr)
177177
return pyscript
178+
179+
180+
def gpu_count():
181+
n_gpus = 1
182+
try:
183+
import GPUtil
184+
except ImportError:
185+
return 1
186+
else:
187+
return len(GPUtil.getGPUs())

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /