I decided to roll out my own EventBus system which is intended to be thread-safe.
Hence a review should focus extra on thread safety apart from all regular concerns.
The EventBus
can work in two ways:
- You can register events and listeners directly on the
EventBus
. - You can the methods, of a specific object, that are single argument void-methods annotated with
@Event
.
First the code, then the unit tests below:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Event { }
public interface EventBus {
void registerListenersOfObject(final Object callbackObject);
<T> void registerListener(final Class<T> eventClass, final Consumer<? extends T> eventListener);
void executeEvent(final Object event);
void removeListenersOfObject(final Object callbackObject);
<T> void removeListener(final Class<T> eventClass, final Consumer<? extends T> eventListener);
void removeAllListenersOfEvent(final Class<?> eventClass);
void removeAllListeners();
}
public class SimpleEventBus implements EventBus {
private final static Set<EventHandler> EMPTY_SET = new HashSet<>();
private final ConcurrentMap<Class<?>, Set<EventHandler>> eventMapping = new ConcurrentHashMap<>();
private final Class<?> classConstraint;
public SimpleEventBus() {
this(Object.class);
}
public SimpleEventBus(final Class<?> eventClassConstraint) {
this.classConstraint = Objects.requireNonNull(eventClassConstraint);
}
@Override
public void registerListenersOfObject(final Object callbackObject) {
Arrays.stream(callbackObject.getClass().getMethods())
.filter(method -> (method.getAnnotation(Event.class) != null))
.filter(method -> method.getReturnType().equals(void.class))
.filter(method -> method.getParameterCount() == 1)
.forEach(method -> {
Class<?> clazz = method.getParameterTypes()[0];
if (!classConstraint.isAssignableFrom(clazz)) {
return;
}
synchronized (eventMapping) {
eventMapping.putIfAbsent(clazz, new HashSet<>());
eventMapping.get(clazz).add(new MethodEventHandler(method, callbackObject, clazz));
}
});
}
@Override
@SuppressWarnings("unchecked")
public <T> void registerListener(final Class<T> eventClass, final Consumer<? extends T> eventListener) {
Objects.requireNonNull(eventClass);
Objects.requireNonNull(eventListener);
if (!classConstraint.isAssignableFrom(eventClass)) {
return;
}
synchronized(eventMapping) {
eventMapping.putIfAbsent(eventClass, new HashSet<>());
eventMapping.get(eventClass).add(new ConsumerEventHandler((Consumer<Object>)eventListener));
}
}
@Override
public void executeEvent(final Object event) {
if (classConstraint.isAssignableFrom(event.getClass())) {
eventMapping.getOrDefault(event.getClass(), EMPTY_SET).forEach(eventHandler -> eventHandler.invoke(event));
}
}
@Override
public void removeListenersOfObject(final Object callbackObject) {
Arrays.stream(callbackObject.getClass().getMethods())
.filter(method -> (method.getAnnotation(Event.class) != null))
.filter(method -> method.getReturnType().equals(void.class))
.filter(method -> method.getParameterCount() == 1)
.forEach(method -> {
Class<?> clazz = method.getParameterTypes()[0];
if (classConstraint.isAssignableFrom(clazz)) {
eventMapping.getOrDefault(clazz, EMPTY_SET).remove(new MethodEventHandler(method, callbackObject, clazz));
}
});
}
@Override
@SuppressWarnings("unchecked")
public <T> void removeListener(final Class<T> eventClass, final Consumer<? extends T> eventListener) {
Objects.requireNonNull(eventClass);
Objects.requireNonNull(eventListener);
if (classConstraint.isAssignableFrom(eventClass)) {
eventMapping.getOrDefault(eventClass, EMPTY_SET).remove(new ConsumerEventHandler((Consumer<Object>)eventListener));
}
}
@Override
public void removeAllListenersOfEvent(final Class<?> eventClass) {
Objects.requireNonNull(eventClass);
eventMapping.remove(eventClass);
}
@Override
public void removeAllListeners() {
eventMapping.clear();
}
private static interface EventHandler {
void invoke(final Object event);
}
private static class MethodEventHandler implements EventHandler {
private final Method method;
private final Object callbackObject;
private final Class<?> eventClass;
public MethodEventHandler(final Method method, final Object object, final Class<?> eventClass) {
this.method = Objects.requireNonNull(method);
this.callbackObject = Objects.requireNonNull(object);
this.eventClass = Objects.requireNonNull(eventClass);
}
@Override
public void invoke(final Object event) {
try {
method.setAccessible(true);
method.invoke(callbackObject, Objects.requireNonNull(event));
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new RuntimeException(ex);
}
}
@Override
public int hashCode() {
int hash = 7;
hash = 71 * hash + Objects.hashCode(this.method);
hash = 71 * hash + Objects.hashCode(this.callbackObject);
hash = 71 * hash + Objects.hashCode(this.eventClass);
return hash;
}
@Override
public boolean equals(final Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final MethodEventHandler other = (MethodEventHandler)obj;
if (!Objects.equals(this.method, other.method)) {
return false;
}
if (!Objects.equals(this.callbackObject, other.callbackObject)) {
return false;
}
if (!Objects.equals(this.eventClass, other.eventClass)) {
return false;
}
return true;
}
}
private static class ConsumerEventHandler implements EventHandler {
private final Consumer<Object> eventListener;
public ConsumerEventHandler(final Consumer<Object> consumer) {
this.eventListener = Objects.requireNonNull(consumer);
}
@Override
public void invoke(final Object event) {
eventListener.accept(Objects.requireNonNull(event));
}
@Override
public int hashCode() {
int hash = 5;
hash = 19 * hash + Objects.hashCode(this.eventListener);
return hash;
}
@Override
public boolean equals(final Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final ConsumerEventHandler other = (ConsumerEventHandler)obj;
if (!Objects.equals(this.eventListener, other.eventListener)) {
return false;
}
return true;
}
}
}
public class SimpleEventBusTest {
static {
assertTrue(true);
}
private AtomicInteger alphaCounter;
private AtomicInteger betaCounter;
private AtomicInteger gammaCounter;
@Before
public void before() {
alphaCounter = new AtomicInteger(0);
betaCounter = new AtomicInteger(0);
gammaCounter = new AtomicInteger(0);
}
private Stream<AtomicInteger> counters() {
return Stream.of(alphaCounter, betaCounter, gammaCounter);
}
@Test
public void testConstructor() {
EventBus eventBus = new SimpleEventBus();
eventBus.registerListenersOfObject(new Object() {
@Event
public void onAlphaEvent(final AlphaEvent alphaEvent) {
alphaCounter.incrementAndGet();
}
});
eventBus.executeEvent(new AlphaEvent());
assertEquals(1, alphaCounter.get());
}
@Test
public void testConstructorWithEventClassConstraint() {
EventBus eventBus = new SimpleEventBus(BetaEvent.class);
eventBus.registerListenersOfObject(new Object() {
@Event
public void onAlphaEvent(final AlphaEvent alphaEvent) {
alphaCounter.incrementAndGet();
}
});
eventBus.registerListener(AlphaEvent.class, alphaEvent -> alphaCounter.incrementAndGet());
eventBus.executeEvent(new AlphaEvent());
assertEquals(0, alphaCounter.get());
}
@Test
public void testRegisterListenersOfObject() {
EventBus eventBus = new SimpleEventBus();
eventBus.registerListenersOfObject(new Object() {
@Event
public void onAlphaEvent1(final AlphaEvent alphaEvent) {
alphaCounter.incrementAndGet();
}
@Event
public void onAlphaEvent2(final AlphaEvent alphaEvent) {
alphaCounter.incrementAndGet();
}
@Event
public void onAlphaEvent3(final AlphaEvent alphaEvent) {
alphaCounter.incrementAndGet();
}
@Event
public void onBetaEvent1(final BetaEvent betaEvent) {
betaCounter.incrementAndGet();
}
@Event
public void onBetaEvent2(final BetaEvent betaEvent) {
betaCounter.incrementAndGet();
}
@Event
public void onGammaEvent(final GammaEvent gammaEvent) {
gammaCounter.incrementAndGet();
}
});
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(3, alphaCounter.get());
assertEquals(2, betaCounter.get());
assertEquals(1, gammaCounter.get());
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(6, alphaCounter.get());
assertEquals(4, betaCounter.get());
assertEquals(2, gammaCounter.get());
}
@Test
public void testRegisterListener() {
EventBus eventBus = new SimpleEventBus();
eventBus.registerListener(AlphaEvent.class, alphaEvent -> alphaCounter.incrementAndGet());
eventBus.registerListener(AlphaEvent.class, alphaEvent -> alphaCounter.incrementAndGet());
eventBus.registerListener(AlphaEvent.class, alphaEvent -> alphaCounter.incrementAndGet());
eventBus.registerListener(BetaEvent.class, betaEvent -> betaCounter.incrementAndGet());
eventBus.registerListener(BetaEvent.class, betaEvent -> betaCounter.incrementAndGet());
eventBus.registerListener(GammaEvent.class, gammaEvent -> gammaCounter.incrementAndGet());
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(3, alphaCounter.get());
assertEquals(2, betaCounter.get());
assertEquals(1, gammaCounter.get());
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(6, alphaCounter.get());
assertEquals(4, betaCounter.get());
assertEquals(2, gammaCounter.get());
}
@Test
public void testExecuteEvent() {
EventBus eventBus = new SimpleEventBus();
eventBus.registerListener(AlphaEvent.class, alphaEvent -> alphaCounter.incrementAndGet());
eventBus.executeEvent(new AlphaEvent());
assertEquals(1, alphaCounter.get());
}
@Test
public void testExecuteEventSameInstance() {
AlphaEvent specificAlphaEvent = new AlphaEvent();
EventBus eventBus = new SimpleEventBus();
eventBus.registerListener(AlphaEvent.class, alphaEvent -> assertTrue(alphaEvent == specificAlphaEvent));
}
@Test
public void testRemoveListenersOfObject() {
EventBus eventBus = new SimpleEventBus();
Object object1 = new Object() {
@Event
public void onAlphaEvent(final AlphaEvent alphaEvent) {
alphaCounter.incrementAndGet();
}
@Event
public void onBetaEvent(final BetaEvent betaEvent) {
betaCounter.incrementAndGet();
}
@Event
public void onGammaEvent(final GammaEvent gammaEvent) {
gammaCounter.incrementAndGet();
}
};
Object object2 = new Object() {
@Event
public void onAlphaEvent(final AlphaEvent alphaEvent) {
alphaCounter.incrementAndGet();
}
@Event
public void onBetaEvent(final BetaEvent betaEvent) {
betaCounter.incrementAndGet();
}
@Event
public void onGammaEvent(final GammaEvent gammaEvent) {
gammaCounter.incrementAndGet();
}
};
eventBus.registerListenersOfObject(object1);
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
counters().allMatch(counter -> counter.get() == 1);
eventBus.registerListenersOfObject(object2);
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
counters().allMatch(counter -> counter.get() == 3);
eventBus.removeListenersOfObject(object2);
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
counters().allMatch(counter -> counter.get() == 4);
eventBus.removeListenersOfObject(object1);
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
counters().allMatch(counter -> counter.get() == 4);
}
@Test
public void testRemoveListener() {
EventBus eventBus = new SimpleEventBus();
Consumer<AlphaEvent> alphaEventListener = alphaEvent -> alphaCounter.incrementAndGet();
Consumer<BetaEvent> betaEventListener = betaEvent -> betaCounter.incrementAndGet();
Consumer<GammaEvent> gammaEventListener = gammaEvent -> gammaCounter.incrementAndGet();
eventBus.registerListener(AlphaEvent.class, alphaEventListener);
eventBus.registerListener(BetaEvent.class, betaEventListener);
eventBus.registerListener(GammaEvent.class, gammaEventListener);
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(1, alphaCounter.get());
assertEquals(1, betaCounter.get());
assertEquals(1, gammaCounter.get());
eventBus.removeListener(GammaEvent.class, gammaEventListener);
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(2, alphaCounter.get());
assertEquals(2, betaCounter.get());
assertEquals(1, gammaCounter.get());
eventBus.removeListener(BetaEvent.class, betaEventListener);
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(3, alphaCounter.get());
assertEquals(2, betaCounter.get());
assertEquals(1, gammaCounter.get());
eventBus.removeListener(AlphaEvent.class, alphaEventListener);
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(3, alphaCounter.get());
assertEquals(2, betaCounter.get());
assertEquals(1, gammaCounter.get());
}
@Test
public void testRemoveAllListenersOfEvent() {
EventBus eventBus = new SimpleEventBus();
eventBus.registerListener(AlphaEvent.class, alphaEvent -> alphaCounter.incrementAndGet());
eventBus.registerListener(AlphaEvent.class, alphaEvent -> alphaCounter.incrementAndGet());
eventBus.registerListener(AlphaEvent.class, alphaEvent -> alphaCounter.incrementAndGet());
eventBus.registerListener(BetaEvent.class, betaEvent -> betaCounter.incrementAndGet());
eventBus.registerListener(BetaEvent.class, betaEvent -> betaCounter.incrementAndGet());
eventBus.registerListener(GammaEvent.class, gammaEvent -> gammaCounter.incrementAndGet());
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(3, alphaCounter.get());
assertEquals(2, betaCounter.get());
assertEquals(1, gammaCounter.get());
eventBus.removeAllListenersOfEvent(AlphaEvent.class);
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(3, alphaCounter.get());
assertEquals(4, betaCounter.get());
assertEquals(2, gammaCounter.get());
eventBus.removeAllListenersOfEvent(BetaEvent.class);
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(3, alphaCounter.get());
assertEquals(4, betaCounter.get());
assertEquals(3, gammaCounter.get());
eventBus.removeAllListenersOfEvent(GammaEvent.class);
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(3, alphaCounter.get());
assertEquals(4, betaCounter.get());
assertEquals(3, gammaCounter.get());
}
@Test
public void testRemoveAllListeners() {
EventBus eventBus = new SimpleEventBus();
eventBus.registerListener(AlphaEvent.class, alphaEvent -> alphaCounter.incrementAndGet());
eventBus.registerListener(AlphaEvent.class, alphaEvent -> alphaCounter.incrementAndGet());
eventBus.registerListener(AlphaEvent.class, alphaEvent -> alphaCounter.incrementAndGet());
eventBus.registerListener(BetaEvent.class, betaEvent -> betaCounter.incrementAndGet());
eventBus.registerListener(BetaEvent.class, betaEvent -> betaCounter.incrementAndGet());
eventBus.registerListener(GammaEvent.class, gammaEvent -> gammaCounter.incrementAndGet());
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(3, alphaCounter.get());
assertEquals(2, betaCounter.get());
assertEquals(1, gammaCounter.get());
eventBus.removeAllListeners();
eventBus.executeEvent(new AlphaEvent());
eventBus.executeEvent(new BetaEvent());
eventBus.executeEvent(new GammaEvent());
assertEquals(3, alphaCounter.get());
assertEquals(2, betaCounter.get());
assertEquals(1, gammaCounter.get());
}
private static class AlphaEvent { }
private static class BetaEvent { }
private static class GammaEvent { }
}
1 Answer 1
The synchronization in the code is in some places overly broad, and in others, it is absent where it is needed.
synchronizing on eventMapping
in your registerListenersOfObject
method means that only one thread can be accessing the eventMapping
instance at any one time. This defeats using the ConcurrentHashMap
concept entirely (where only a small portion of the map is locked and other portions are available for other threads). The granularity of this lock is overly broad.
Inside that lock, you add data to (and potentially create) a HashSet<EventHandler>
instance. This HashSet is then used in other methods, but without any synchronization. Those other methods may have issues with concurrency because they are not included in any synchronization at all.
@Override public void executeEvent(final Object event) { if (classConstraint.isAssignableFrom(event.getClass())) { eventMapping.getOrDefault(event.getClass(), EMPTY_SET).forEach(eventHandler -> eventHandler.invoke(event)); } }
in the above code, while performing the forEach, any of the following things are possible (and other things as well, I am sure):
- data could be added to the Set you are streaming, and that data may, or may not be included in the stream.
- the stream could throw a ConcurrentModificationException
- the steam could end early (and some data may not be processed at all.
- ......
Consider the following code in the SimpleEventBus
. This code handles adding and using event handlers (though removing handlers needs to be fixed as well)....
private final void includeEventHandler(final Class<?> clazz, final EventHandler handler) {
Set<EventHandler> existing = eventMapping.get(clazz);
if (existing == null) {
final Set<EventHandler> created = new HashSet<>();
// optimistically assume that we are the first thread for this particular class.
existing = eventMapping.putIfAbsent(clazz, created);
if (existing == null) {
// we are the first thread to add one for this clazz
existing = created;
}
}
synchronized (existing) {
existing.add(handler);
}
}
private final EventHandler[] getEventHandlers(final Class<?> clazz) {
Set<EventHandler> handlers = eventMapping.get(clazz);
if (handlers == null) {
return new EventHandler[0];
}
synchronized(handlers) {
return handlers.toArray(new EventHandler[handlers.size()]);
}
}
@Override
public void registerListenersOfObject(final Object callbackObject) {
Arrays.stream(callbackObject.getClass().getMethods())
.filter(method -> (method.getAnnotation(Event.class) != null))
.filter(method -> method.getReturnType().equals(void.class))
.filter(method -> method.getParameterCount() == 1)
.forEach(method -> {
Class<?> clazz = method.getParameterTypes()[0];
if (!classConstraint.isAssignableFrom(clazz)) {
return;
}
includeEventHandler(clazz, new MethodEventHandler(method, callbackObject, clazz));
});
}
@Override
@SuppressWarnings("unchecked")
public <T> void registerListener(final Class<T> eventClass, final Consumer<? extends T> eventListener) {
Objects.requireNonNull(eventClass);
Objects.requireNonNull(eventListener);
if (!classConstraint.isAssignableFrom(eventClass)) {
return;
}
includeEventHandler(eventClass, new ConsumerEventHandler((Consumer<Object>)eventListener));
}
@Override
public void executeEvent(final Object event) {
if (classConstraint.isAssignableFrom(event.getClass())) {
Arrays.stream(getEventHandlers(event.getClass())).forEach(eventHandler -> eventHandler.invoke(event));
}
}
The above code uses the ConcurrentHashMap in a way that is minimally locked. It uses an optimistic process for creating a new HashSet only when it is likely going to be used (instead of creating, and throwing it away almost all the time). It also makes sure that, if one is created in a different thread, and our optimism was proven wrong, that we use the one that other threads are using.
Then, for the actual HashSet, it synchronizes on the whole set, and all operations are completely isolated from other threads.
This is OK, because, the only time there will be thread blocking, is when two threads are accessing the event handlers for a single Class.... which is likely to be uncommon.
Note, that the getHandlers creates a defensive copy of the Set, so that iteration has a consistent copy of the data, and that there does not need to be any locking during the iteration.
Edit: To remove unnecessary work in the code, I would actually recommend the following:
private final EventHandler[] getEventHandlers(final Class<?> clazz) {
Set<EventHandler> handlers = eventMapping.get(clazz);
if (handlers == null) {
return null;
}
synchronized(handlers) {
return handlers.toArray(new EventHandler[handlers.size()]);
}
}
@Override
public void executeEvent(final Object event) {
if (classConstraint.isAssignableFrom(event.getClass())) {
EventHandler[] handlers = getEventHandlers(event.getClass());
if (handlers != null) {
Arrays.stream(handlers).forEach(eventHandler -> eventHandler.invoke(event));
}
}
}
Explore related questions
See similar questions with these tags.