Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 0720aa1

Browse files
support for gpu queue
1 parent a17de8e commit 0720aa1

File tree

3 files changed

+74
-7
lines changed

3 files changed

+74
-7
lines changed

‎nipype/pipeline/engine/nodes.py‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,10 @@ def update(self, **opts):
821821
"""Update inputs"""
822822
self.inputs.update(**opts)
823823

824+
def is_gpu_node(self):
825+
return ((hasattr(self.inputs, 'use_cuda') and self.inputs.use_cuda)
826+
or (hasattr(self.inputs, 'use_gpu') and self.inputs.use_gpu))
827+
824828

825829
class JoinNode(Node):
826830
"""Wraps interface objects that join inputs into a list.

‎nipype/pipeline/plugins/multiproc.py‎

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class MultiProcPlugin(DistributedPluginBase):
100100
101101
- non_daemon: boolean flag to execute as non-daemon processes
102102
- n_procs: maximum number of threads to be executed in parallel
103+
- n_gpu_procs: maximum number of GPU threads to be executed in parallel
103104
- memory_gb: maximum memory (in GB) that can be used at once.
104105
- raise_insufficient: raise error if the requested resources for
105106
a node over the maximum `n_procs` and/or `memory_gb`
@@ -130,10 +131,22 @@ def __init__(self, plugin_args=None):
130131
)
131132
self.raise_insufficient = self.plugin_args.get("raise_insufficient", True)
132133

134+
# GPU found on system
135+
self.n_gpus_visible = MultiProcPlugin.gpu_count()
136+
# proc per GPU set by user
137+
self.n_gpu_procs = plugin_args.get('n_gpu_procs', self.n_gpus_visible)
138+
139+
# total no. of processes allowed on all gpus
140+
if self.n_gpu_procs > self.n_gpus_visible:
141+
logger.info(
142+
'Total number of GPUs proc requested (%d) exceeds the available number of GPUs (%d) on the system. Using requested GPU slots at your own risk!' % (
143+
self.n_gpu_procs, self.n_gpus_visible))
144+
133145
# Instantiate different thread pools for non-daemon processes
134146
logger.debug(
135-
"[MultiProc] Starting (n_procs=%d, mem_gb=%0.2f, cwd=%s)",
147+
"[MultiProc] Starting (n_procs=%d, n_gpu_procs=%d, mem_gb=%0.2f, cwd=%s)",
136148
self.processors,
149+
self.n_gpu_procs,
137150
self.memory_gb,
138151
self._cwd,
139152
)
@@ -184,9 +197,12 @@ def _prerun_check(self, graph):
184197
"""Check if any node exceeds the available resources"""
185198
tasks_mem_gb = []
186199
tasks_num_th = []
200+
tasks_gpu_th = []
187201
for node in graph.nodes():
188202
tasks_mem_gb.append(node.mem_gb)
189203
tasks_num_th.append(node.n_procs)
204+
if node.is_gpu_node():
205+
tasks_gpu_th.append(node.n_procs)
190206

191207
if np.any(np.array(tasks_mem_gb) > self.memory_gb):
192208
logger.warning(
@@ -203,6 +219,12 @@ def _prerun_check(self, graph):
203219
)
204220
if self.raise_insufficient:
205221
raise RuntimeError("Insufficient resources available for job")
222+
if np.any(np.array(tasks_gpu_th) > self.n_gpu_procs):
223+
logger.warning(
224+
'Nodes demand more GPU than allowed (%d).',
225+
self.n_gpu_procs)
226+
if self.raise_insufficient:
227+
raise RuntimeError('Insufficient GPU resources available for job')
206228

207229
def _postrun_check(self):
208230
self.pool.shutdown()
@@ -213,11 +235,14 @@ def _check_resources(self, running_tasks):
213235
"""
214236
free_memory_gb = self.memory_gb
215237
free_processors = self.processors
238+
free_gpu_slots = self.n_gpu_procs
216239
for _, jobid in running_tasks:
217240
free_memory_gb -= min(self.procs[jobid].mem_gb, free_memory_gb)
218241
free_processors -= min(self.procs[jobid].n_procs, free_processors)
242+
if self.procs[jobid].is_gpu_node():
243+
free_gpu_slots -= min(self.procs[jobid].n_procs, free_gpu_slots)
219244

220-
return free_memory_gb, free_processors
245+
return free_memory_gb, free_processors, free_gpu_slots
221246

222247
def _send_procs_to_workers(self, updatehash=False, graph=None):
223248
"""
@@ -232,7 +257,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
232257
)
233258

234259
# Check available resources by summing all threads and memory used
235-
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
260+
free_memory_gb, free_processors, free_gpu_slots = self._check_resources(self.pending_tasks)
236261

237262
stats = (
238263
len(self.pending_tasks),
@@ -241,6 +266,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
241266
self.memory_gb,
242267
free_processors,
243268
self.processors,
269+
free_gpu_slots,
270+
self.n_gpu_procs
244271
)
245272
if self._stats != stats:
246273
tasks_list_msg = ""
@@ -256,13 +283,15 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
256283
tasks_list_msg = indent(tasks_list_msg, " " * 21)
257284
logger.info(
258285
"[MultiProc] Running %d tasks, and %d jobs ready. Free "
259-
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s",
286+
"memory (GB): %0.2f/%0.2f, Free processors: %d/%d, Free GPU slot:%d/%d.%s",
260287
len(self.pending_tasks),
261288
len(jobids),
262289
free_memory_gb,
263290
self.memory_gb,
264291
free_processors,
265292
self.processors,
293+
free_gpu_slots,
294+
self.n_gpu_procs,
266295
tasks_list_msg,
267296
)
268297
self._stats = stats
@@ -304,28 +333,36 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
304333
# Check requirements of this job
305334
next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb)
306335
next_job_th = min(self.procs[jobid].n_procs, self.processors)
336+
next_job_gpu_th = min(self.procs[jobid].n_procs, self.n_gpu_procs)
337+
338+
is_gpu_node = self.procs[jobid].is_gpu_node()
307339

308340
# If node does not fit, skip at this moment
309-
if next_job_th > free_processors or next_job_gb > free_memory_gb:
341+
if (next_job_th > free_processors or next_job_gb > free_memory_gb
342+
or (is_gpu_node and next_job_gpu_th > free_gpu_slots)):
310343
logger.debug(
311-
"Cannot allocate job %d (%0.2fGB, %d threads).",
344+
"Cannot allocate job %d (%0.2fGB, %d threads, %d GPU slots).",
312345
jobid,
313346
next_job_gb,
314347
next_job_th,
348+
next_job_gpu_th,
315349
)
316350
continue
317351

318352
free_memory_gb -= next_job_gb
319353
free_processors -= next_job_th
354+
if is_gpu_node:
355+
free_gpu_slots -= next_job_gpu_th
320356
logger.debug(
321357
"Allocating %s ID=%d (%0.2fGB, %d threads). Free: "
322-
"%0.2fGB, %d threads.",
358+
"%0.2fGB, %d threads, %d GPU slots.",
323359
self.procs[jobid].fullname,
324360
jobid,
325361
next_job_gb,
326362
next_job_th,
327363
free_memory_gb,
328364
free_processors,
365+
free_gpu_slots,
329366
)
330367

331368
# change job status in appropriate queues
@@ -352,6 +389,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
352389
self._remove_node_dirs()
353390
free_memory_gb += next_job_gb
354391
free_processors += next_job_th
392+
if is_gpu_node:
393+
free_gpu_slots -= next_job_gpu_th
355394
# Display stats next loop
356395
self._stats = None
357396

@@ -379,3 +418,12 @@ def _sort_jobs(self, jobids, scheduler="tsort"):
379418
key=lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs),
380419
)
381420
return jobids
421+
422+
@staticmethod
423+
def gpu_count():
424+
n_gpus = 1
425+
try:
426+
import GPUtil
427+
return len(GPUtil.getGPUs())
428+
except ImportError:
429+
return n_gpus

‎nipype/pipeline/plugins/tests/test_multiproc.py‎

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def test_run_multiproc(tmpdir):
5656
class InputSpecSingleNode(nib.TraitedSpec):
5757
input1 = nib.traits.Int(desc="a random int")
5858
input2 = nib.traits.Int(desc="a random int")
59+
use_gpu = nib.traits.Bool(False, mandatory = False, desc="boolean for GPU nodes")
5960

6061

6162
class OutputSpecSingleNode(nib.TraitedSpec):
@@ -116,6 +117,20 @@ def test_no_more_threads_than_specified(tmpdir):
116117
with pytest.raises(RuntimeError):
117118
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads})
118119

120+
def test_no_more_gpu_threads_than_specified(tmpdir):
121+
tmpdir.chdir()
122+
123+
pipe = pe.Workflow(name="pipe")
124+
n1 = pe.Node(SingleNodeTestInterface(), name="n1", n_procs=2)
125+
n1.inputs.use_gpu = True
126+
n1.inputs.input1 = 4
127+
pipe.add_nodes([n1])
128+
129+
max_threads = 2
130+
max_gpu = 1
131+
with pytest.raises(RuntimeError):
132+
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads, 'n_gpu_procs': max_gpu})
133+
119134

120135
@pytest.mark.skipif(
121136
sys.version_info >= (3, 8), reason="multiprocessing issues in Python 3.8"

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /