I have Producer Thread
s A, B and C producing 3 different types of events Event
s A, B and C respectively. The Consumer thread can only process only one Event B at a time, where as it can process any number of Events A & C at a point of time.
Event
class:
package codility.question;
public class Event {
private String type;
public Event(String repository) {
super();
this.type = repository;
}
@Override
public String toString() {
return "Event [repository=" + type + "]";
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
ProducerConsumer
class:
package codility.question;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumer {
private static final String SPECIAL_EVENT_TYPE_B = "B";
private static int CAPACITY = 10;
public static void main(String[] args) {
final BlockingQueue<Event> queue = new LinkedBlockingQueue<Event>(CAPACITY);
final Lock lock = new ReentrantLock();
Thread eventSchedulerAlpha = new Thread("Event A") {
public void run() {
try {
Event event = new Event("A");
queue.put(event);
// thread will block here
System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), event.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
eventSchedulerAlpha.start();
Thread eventSchedulerBeta = new Thread("Event B") {
public void run() {
try {
Event event = new Event(SPECIAL_EVENT_TYPE_B);
queue.put(event);
// thread will block here
System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), event.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
eventSchedulerBeta.start();
Thread eventSchedulerKappa = new Thread("Event C") {
public void run() {
try {
Event event = new Event("C");
queue.put(event);
// thread will block here
System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), event.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
eventSchedulerKappa.start();
Thread builder = new Thread("Builder") {
public void run() {
System.out.println("Started the Builder");
try {
Event processPR = null;
while(queue.size()>0) {
Event pr = queue.peek();
if(pr!=null && SPECIAL_EVENT_TYPE_B.equals(pr.getType())) {
lock.lock();
processPR = queue.take();
processEvents(processPR);
lock.unlock();
} else {
processPR = queue.take();
processEvents(processPR);
}
// thread will block here
System.out.printf("[%s] consumed event : %s %n", Thread.currentThread().getName(), pr.toString());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
builder.start();
}
public static void processEvents(Event pr) {
System.out.println("The build process BEGINS for" + pr.toString());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("The build process ENDS for" + pr.toString());
}
}
1 Answer 1
- Use enums instead of a set of string constants
You can modify your event class to make use of enums for the event type. This way you obtain type safety and don't need to use String comparations:
public class Event {
public enum EventType {
A, SPECIAL_EVENT_TYPE_B, C;
}
private EventType type;
...
}
- Define a class for your Producers
You are creating three producers threads with the same code for the run method. Create a new class and reuse the same code.
Use final for your constants:
private static int CAPACITY = 10;
This should be:
private static final int CAPACITY = 10;
- Your producer is not consuming the Events in new threads, so it is only processing an event at a time. The consumer should be able to process any number of A and C Events and one B Event at a time.
All together, your code could be:
public class Event {
public enum EventType {
A, SPECIAL_EVENT_TYPE_B, C;
}
private EventType type;
public Event(EventType repository) {
super();
this.type = repository;
}
@Override
public String toString() {
return "Event [repository=" + type + "]";
}
public EventType getType() {
return type;
}
public void setType(EventType type) {
this.type = type;
}
}
public class ProducerConsumer {
private static int CAPACITY = 10;
public static void main(String[] args) {
final BlockingQueue<Event> queue = new LinkedBlockingQueue<Event>(CAPACITY);
new Thread(new Producer(queue, Event.EventType.A), "Event A").start();
new Thread(new Producer(queue, Event.EventType.SPECIAL_EVENT_TYPE_B), "Event B").start();
new Thread(new Producer(queue, Event.EventType.C), "Event C").start();
new Thread(new Consumer(queue),"Builder").start();
}
}
public class Producer implements Runnable {
private BlockingQueue<Event> queue;
private Event.EventType eventType;
public Producer(BlockingQueue<Event> queue, Event.EventType eventType) {
this.queue = queue;
this.eventType = eventType;
}
public void run() {
try {
Event event = new Event(eventType);
queue.put(event);
// thread will block here
System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), event.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Consumer implements Runnable {
BlockingQueue<Event> queue;
final Lock lock = new ReentrantLock();
public Consumer(BlockingQueue<Event> queue) {
this.queue = queue;
}
public void run() {
System.out.println("Started the Builder");
try {
Executor executor = Executors.newCachedThreadPool();
while(queue.size() > 0) {
Event event = queue.take();
executor.execute(new Runnable() {
@Override
public void run() {
if(event.getType() == Event.EventType.SPECIAL_EVENT_TYPE_B) {
try {
lock.lock();
processEvents(event);
} finally {
lock.unlock();
}
} else {
processEvents(event);
}
System.out.printf("[%s] consumed event : %s %n", "Executor Thread", event.toString());
}
});
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void processEvents(Event pr) {
System.out.println("The build process BEGINS for" + pr.toString());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("The build process ENDS for" + pr.toString());
}
}
Explore related questions
See similar questions with these tags.
processEvents()
function should start a new thread to do the work so that we can see a "build A" and "build C" happening at the same time, in accordance with your problem description. Also, if the work were done in a background thread, we could see how you would handle the situation of two Bs. Right now everything is done serially so there is never any potential problem. \$\endgroup\$