1
+ package com .interview .multithreaded ;
2
+
3
+ import com .google .common .collect .Lists ;
4
+
5
+ import java .util .ArrayList ;
6
+ import java .util .HashMap ;
7
+ import java .util .List ;
8
+ import java .util .Map ;
9
+ import java .util .concurrent .CompletableFuture ;
10
+ import java .util .concurrent .Executor ;
11
+ import java .util .concurrent .ExecutorService ;
12
+ import java .util .concurrent .Executors ;
13
+
14
+ /**
15
+ * Given a Task with list of its dependencies and execute method. Run the task such that dependencies are executed first.
16
+ * You are given x number of threads. Increase parallelism as much as possible.
17
+ */
18
+ public class DependencyTaskExecutor {
19
+
20
+ Map <String , CompletableFuture <Void >> taskTracker = new HashMap <>();
21
+ void scheduleTask (List <Task > tasks , int threads ) {
22
+ ExecutorService executor = Executors .newFixedThreadPool (threads );
23
+ CompletableFuture <Void > future = CompletableFuture .completedFuture (null );
24
+ for (Task task : tasks ) {
25
+ future = future .thenAcceptBothAsync (scheduleTaskUtil (task , executor ), (a , b ) -> {}, executor );
26
+ }
27
+ future .thenRunAsync (() -> {System .out .println ("All tasks done. Closing executor" ); executor .shutdown ();});
28
+ }
29
+
30
+ CompletableFuture <Void > scheduleTaskUtil (Task task , Executor executor ) {
31
+ CompletableFuture <Void > f = taskTracker .get (task .name ());
32
+ if (f != null ) {
33
+ return f ;
34
+ }
35
+ if (task .dependencies ().isEmpty ()) {
36
+ CompletableFuture <Void > future = CompletableFuture .runAsync (() -> task .execute (), executor );
37
+ taskTracker .put (task .name (), future );
38
+ return future ;
39
+ }
40
+ CompletableFuture <Void > future = null ;
41
+ for (Task upstreamTask : task .dependencies ()) {
42
+ if (future == null ) {
43
+ future = scheduleTaskUtil (upstreamTask , executor );
44
+ } else {
45
+ future = future .thenAcceptBothAsync (scheduleTaskUtil (upstreamTask , executor ), (a , b ) -> {
46
+ }, executor );
47
+ }
48
+ }
49
+ future = future .thenRunAsync (() -> task .execute (), executor );
50
+ taskTracker .put (task .name (), future );
51
+ return future ;
52
+ }
53
+
54
+ public static void main (String args []) {
55
+ DependencyTaskExecutor taskExecutor = new DependencyTaskExecutor ();
56
+ SimpleSleepTask a = new SimpleSleepTask ("a" , 2000 );
57
+ SimpleSleepTask b = new SimpleSleepTask ("b" , 4000 );
58
+ SimpleSleepTask c = new SimpleSleepTask ("c" , 6000 );
59
+ SimpleSleepTask d = new SimpleSleepTask ("d" , 3000 );
60
+ SimpleSleepTask x = new SimpleSleepTask ("x" , 4000 );
61
+ SimpleSleepTask y = new SimpleSleepTask ("y" , 6000 );
62
+ SimpleSleepTask z = new SimpleSleepTask ("z" , 3000 );
63
+
64
+ d .addDependency (b );
65
+ d .addDependency (c );
66
+ c .addDependency (a );
67
+ b .addDependency (a );
68
+ x .addDependency (y );
69
+ x .addDependency (z );
70
+ y .addDependency (a );
71
+ taskExecutor .scheduleTask (Lists .newArrayList (a , b , c , d , x , y , z ), 3 );
72
+ }
73
+ }
74
+
75
+ interface Task {
76
+ String name ();
77
+ List <Task > dependencies ();
78
+ void execute ();
79
+ }
80
+
81
+ class SimpleSleepTask implements Task {
82
+ String name ;
83
+ int sleepTimeInMillis ;
84
+ List <Task > dependencies = new ArrayList <>();
85
+ SimpleSleepTask (String name , int sleepTimeInMillis ) {
86
+ this .name = name ;
87
+ this .sleepTimeInMillis = sleepTimeInMillis ;
88
+ }
89
+
90
+ void addDependency (Task task ) {
91
+ dependencies .add (task );
92
+ }
93
+
94
+ @ Override
95
+ public String name () {
96
+ return name ;
97
+ }
98
+
99
+ @ Override
100
+ public List <Task > dependencies () {
101
+ return dependencies ;
102
+ }
103
+
104
+ @ Override
105
+ public void execute () {
106
+ try {
107
+ System .out .println ("Starting sleep for task " + name );
108
+ Thread .sleep (sleepTimeInMillis );
109
+ System .out .println ("Ending sleep for task " + name );
110
+ } catch (InterruptedException e ) {
111
+ e .printStackTrace ();
112
+ }
113
+ }
114
+ }
0 commit comments