1
\$\begingroup\$

I have Producer Threads A, B and C producing 3 different types of events Events A, B and C respectively. The Consumer thread can only process only one Event B at a time, where as it can process any number of Events A & C at a point of time.

Event class:

package codility.question;
public class Event {
 private String type;
 public Event(String repository) {
 super();
 this.type = repository;
 }
 @Override
 public String toString() {
 return "Event [repository=" + type + "]";
 }
 public String getType() {
 return type;
 }
 public void setType(String type) {
 this.type = type;
 }
}

ProducerConsumer class:

package codility.question;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumer {
 private static final String SPECIAL_EVENT_TYPE_B = "B";
 private static int CAPACITY = 10;
 public static void main(String[] args) {
 final BlockingQueue<Event> queue = new LinkedBlockingQueue<Event>(CAPACITY);
 final Lock lock = new ReentrantLock();
 Thread eventSchedulerAlpha = new Thread("Event A") {
 public void run() {
 try {
 Event event = new Event("A");
 queue.put(event);
 // thread will block here
 System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), event.toString());
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 };
 eventSchedulerAlpha.start();
 Thread eventSchedulerBeta = new Thread("Event B") {
 public void run() {
 try {
 Event event = new Event(SPECIAL_EVENT_TYPE_B);
 queue.put(event);
 // thread will block here
 System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), event.toString());
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 };
 eventSchedulerBeta.start();
 Thread eventSchedulerKappa = new Thread("Event C") {
 public void run() {
 try {
 Event event = new Event("C");
 queue.put(event);
 // thread will block here
 System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), event.toString());
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 };
 eventSchedulerKappa.start();
 Thread builder = new Thread("Builder") {
 public void run() {
 System.out.println("Started the Builder");
 try {
 Event processPR = null;
 while(queue.size()>0) {
 Event pr = queue.peek();
 if(pr!=null && SPECIAL_EVENT_TYPE_B.equals(pr.getType())) {
 lock.lock();
 processPR = queue.take();
 processEvents(processPR);
 lock.unlock();
 } else {
 processPR = queue.take();
 processEvents(processPR);
 }
 // thread will block here
 System.out.printf("[%s] consumed event : %s %n", Thread.currentThread().getName(), pr.toString());
 }
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 };
 builder.start();
 }
 public static void processEvents(Event pr) {
 System.out.println("The build process BEGINS for" + pr.toString());
 try {
 Thread.sleep(5000);
 } catch (InterruptedException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }
 System.out.println("The build process ENDS for" + pr.toString());
 }
}
Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Jun 29, 2016 at 0:57
\$\endgroup\$
2
  • 2
    \$\begingroup\$ I feel like your simulation is unrealistic because 1) There are only 3 events ever, and 2) events A and C are not really consumed "in parallel". I feel like your processEvents() function should start a new thread to do the work so that we can see a "build A" and "build C" happening at the same time, in accordance with your problem description. Also, if the work were done in a background thread, we could see how you would handle the situation of two Bs. Right now everything is done serially so there is never any potential problem. \$\endgroup\$ Commented Jun 29, 2016 at 2:55
  • \$\begingroup\$ true, my solution is wrong for my problem..i think the solution below does that by using an executor service. \$\endgroup\$ Commented Jun 29, 2016 at 17:42

1 Answer 1

3
\$\begingroup\$
  • Use enums instead of a set of string constants

You can modify your event class to make use of enums for the event type. This way you obtain type safety and don't need to use String comparations:

public class Event {
 public enum EventType {
 A, SPECIAL_EVENT_TYPE_B, C;
 } 
 private EventType type;
 ...
}
  • Define a class for your Producers

You are creating three producers threads with the same code for the run method. Create a new class and reuse the same code.

  • Use final for your constants:

    private static int CAPACITY = 10;

This should be:

private static final int CAPACITY = 10;
  • Your producer is not consuming the Events in new threads, so it is only processing an event at a time. The consumer should be able to process any number of A and C Events and one B Event at a time.

All together, your code could be:

public class Event {
 public enum EventType {
 A, SPECIAL_EVENT_TYPE_B, C;
 }
 private EventType type;
 public Event(EventType repository) {
 super();
 this.type = repository;
 }
 @Override
 public String toString() {
 return "Event [repository=" + type + "]";
 }
 public EventType getType() {
 return type;
 }
 public void setType(EventType type) {
 this.type = type;
 }
}

public class ProducerConsumer {
 private static int CAPACITY = 10;
 public static void main(String[] args) {
 final BlockingQueue<Event> queue = new LinkedBlockingQueue<Event>(CAPACITY);
 new Thread(new Producer(queue, Event.EventType.A), "Event A").start();
 new Thread(new Producer(queue, Event.EventType.SPECIAL_EVENT_TYPE_B), "Event B").start();
 new Thread(new Producer(queue, Event.EventType.C), "Event C").start();
 new Thread(new Consumer(queue),"Builder").start();
 }
}

public class Producer implements Runnable {
 private BlockingQueue<Event> queue;
 private Event.EventType eventType;
 public Producer(BlockingQueue<Event> queue, Event.EventType eventType) {
 this.queue = queue;
 this.eventType = eventType;
 }
 public void run() {
 try {
 Event event = new Event(eventType);
 queue.put(event);
 // thread will block here
 System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), event.toString());
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
}

public class Consumer implements Runnable {
 BlockingQueue<Event> queue;
 final Lock lock = new ReentrantLock();
 public Consumer(BlockingQueue<Event> queue) {
 this.queue = queue;
 }
 public void run() {
 System.out.println("Started the Builder");
 try {
 Executor executor = Executors.newCachedThreadPool();
 while(queue.size() > 0) {
 Event event = queue.take();
 executor.execute(new Runnable() {
 @Override
 public void run() {
 if(event.getType() == Event.EventType.SPECIAL_EVENT_TYPE_B) {
 try {
 lock.lock();
 processEvents(event);
 } finally {
 lock.unlock();
 }
 } else {
 processEvents(event);
 } 
 System.out.printf("[%s] consumed event : %s %n", "Executor Thread", event.toString());
 }
 });
 }
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 } 
 public void processEvents(Event pr) { 
 System.out.println("The build process BEGINS for" + pr.toString());
 try {
 Thread.sleep(5000);
 } catch (InterruptedException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }
 System.out.println("The build process ENDS for" + pr.toString());
 } 
}
answered Jun 29, 2016 at 10:28
\$\endgroup\$
0

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.