6
\$\begingroup\$

This question is a followup from my previous question My EventBus system, and incorporates most points from @rolfl's answer.

It includes, but is not limited to:

  • Usage of Collections.synchronizedSet over manual synchronized { } on trivial methods.
  • Minimal locking
  • High performance code, but not going into micro-optimizations if it harms the readability of the code.

The code:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Event { }

public interface EventBus {
 void registerListenersOfObject(final Object callbackObject);
 <T> void registerListener(final Class<T> eventClass, final Consumer<? extends T> eventListener);
 void executeEvent(final Object event);
 void removeListenersOfObject(final Object callbackObject);
 <T> void removeListener(final Class<T> eventClass, final Consumer<? extends T> eventListener);
 void removeAllListenersOfEvent(final Class<?> eventClass);
 void removeAllListeners();
}

public class SimpleEventBus implements EventBus {
 private final static Set<EventHandler> EMPTY_SET = new HashSet<>();
 private final static EventHandler[] EMPTY_ARRAY = new EventHandler[0];
 private final ConcurrentMap<Class<?>, Set<EventHandler>> eventMapping = new ConcurrentHashMap<>();
 private final Class<?> eventClassConstraint;
 public SimpleEventBus() {
 this(Object.class);
 }
 public SimpleEventBus(final Class<?> eventClassConstraint) {
 this.eventClassConstraint = Objects.requireNonNull(eventClassConstraint);
 }
 @Override
 public void registerListenersOfObject(final Object callbackObject) {
 Arrays.stream(callbackObject.getClass().getMethods())
 .filter(this::isEligibleMethod)
 .forEach(method -> {
 Class<?> eventClass = method.getParameterTypes()[0];
 addEventHandler(eventClass, new MethodEventHandler(method, callbackObject, eventClass));
 });
 }
 @Override
 @SuppressWarnings("unchecked")
 public <T> void registerListener(final Class<T> eventClass, final Consumer<? extends T> eventListener) {
 Objects.requireNonNull(eventClass);
 Objects.requireNonNull(eventListener);
 if (eventClassConstraint.isAssignableFrom(eventClass)) {
 addEventHandler(eventClass, new ConsumerEventHandler((Consumer<Object>)eventListener));
 }
 }
 @Override
 public void executeEvent(final Object event) {
 if (eventClassConstraint.isAssignableFrom(event.getClass())) {
 getCopyOfEventHandlers(event.getClass()).forEach(eventHandler -> eventHandler.invoke(event));
 }
 }
 @Override
 public void removeListenersOfObject(final Object callbackObject) {
 Arrays.stream(callbackObject.getClass().getMethods())
 .filter(this::isEligibleMethod)
 .forEach(method -> {
 Class<?> eventClass = method.getParameterTypes()[0];
 removeEventHandler(eventClass, new MethodEventHandler(method, callbackObject, eventClass));
 });
 }
 @Override
 @SuppressWarnings("unchecked")
 public <T> void removeListener(final Class<T> eventClass, final Consumer<? extends T> eventListener) {
 Objects.requireNonNull(eventClass);
 Objects.requireNonNull(eventListener);
 if (eventClassConstraint.isAssignableFrom(eventClass)) {
 removeEventHandler(eventClass, new ConsumerEventHandler((Consumer<Object>)eventListener));
 }
 }
 @Override
 public void removeAllListenersOfEvent(final Class<?> eventClass) {
 eventMapping.remove(Objects.requireNonNull(eventClass));
 }
 @Override
 public void removeAllListeners() {
 eventMapping.clear();
 }
 private boolean isEligibleMethod(final Method method) {
 return (method.getAnnotation(Event.class) != null
 && method.getReturnType().equals(void.class)
 && method.getParameterCount() == 1
 && eventClassConstraint.isAssignableFrom(method.getParameterTypes()[0]));
 }
 private void addEventHandler(final Class<?> eventClass, final EventHandler eventHandler) {
 Objects.requireNonNull(eventClass);
 Objects.requireNonNull(eventHandler);
 eventMapping.putIfAbsent(eventClass, Collections.synchronizedSet(new HashSet<>()));
 eventMapping.get(eventClass).add(eventHandler);
 }
 private void removeEventHandler(final Class<?> eventClass, final EventHandler eventHandler) {
 Objects.requireNonNull(eventClass);
 Objects.requireNonNull(eventHandler);
 eventMapping.getOrDefault(eventClass, EMPTY_SET).remove(eventHandler);
 }
 private Stream<EventHandler> getCopyOfEventHandlers(final Class<?> eventClass) {
 Set<EventHandler> eventHandlers = eventMapping.get(Objects.requireNonNull(eventClass));
 return (eventHandlers == null) 
 ? Stream.empty()
 : Arrays.stream(eventHandlers.toArray(EMPTY_ARRAY));
 }
 private static interface EventHandler {
 void invoke(final Object event);
 }
 private static class MethodEventHandler implements EventHandler {
 private final Method method;
 private final Object callbackObject;
 private final Class<?> eventClass;
 public MethodEventHandler(final Method method, final Object object, final Class<?> eventClass) {
 this.method = Objects.requireNonNull(method);
 this.callbackObject = Objects.requireNonNull(object);
 this.eventClass = Objects.requireNonNull(eventClass);
 }
 @Override
 public void invoke(final Object event) {
 try {
 method.setAccessible(true);
 method.invoke(callbackObject, Objects.requireNonNull(event));
 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
 throw new RuntimeException(ex);
 }
 }
 @Override
 public int hashCode() {
 int hash = 7;
 hash = 71 * hash + Objects.hashCode(this.method);
 hash = 71 * hash + Objects.hashCode(this.callbackObject);
 hash = 71 * hash + Objects.hashCode(this.eventClass);
 return hash;
 }
 @Override
 public boolean equals(final Object obj) {
 if (obj == null) {
 return false;
 }
 if (getClass() != obj.getClass()) {
 return false;
 }
 final MethodEventHandler other = (MethodEventHandler)obj;
 if (!Objects.equals(this.method, other.method)) {
 return false;
 }
 if (!Objects.equals(this.callbackObject, other.callbackObject)) {
 return false;
 }
 if (!Objects.equals(this.eventClass, other.eventClass)) {
 return false;
 }
 return true;
 }
 }
 private static class ConsumerEventHandler implements EventHandler {
 private final Consumer<Object> eventListener;
 public ConsumerEventHandler(final Consumer<Object> consumer) {
 this.eventListener = Objects.requireNonNull(consumer);
 }
 @Override
 public void invoke(final Object event) {
 eventListener.accept(Objects.requireNonNull(event));
 }
 @Override
 public int hashCode() {
 int hash = 5;
 hash = 19 * hash + Objects.hashCode(this.eventListener);
 return hash;
 }
 @Override
 public boolean equals(final Object obj) {
 if (obj == null) {
 return false;
 }
 if (getClass() != obj.getClass()) {
 return false;
 }
 final ConsumerEventHandler other = (ConsumerEventHandler)obj;
 if (!Objects.equals(this.eventListener, other.eventListener)) {
 return false;
 }
 return true;
 }
 }
}

The unit tests of the old version are still valid and also still pass.

asked May 5, 2014 at 11:37
\$\endgroup\$

2 Answers 2

5
\$\begingroup\$

There are two bugs related to type system:

Currently if all three parameter types, registerListener's first, and registered Consumer's and executeEvent's, it works as expected.

@Test
public void worksAsExpected() {
 String expected = "WORKS";
 AtomicReference<String> actual = new AtomicReference<>(null);
 EventBus bus = new SimpleEventBus(String.class);
 bus.registerListener(String.class, (String s)-> {actual.set(s);});
 bus.executeEvent(expected);
 assertEquals(expected, actual.get());
}

In the below case I register a listener for events of type Number and execute and event of type Number but it fails to execute the handler:

@Test
public void shouldHaveWorkedButDoesNot() {
 Number expected = 1;
 AtomicReference<Number> actual = new AtomicReference<>(null);
 EventBus bus = new SimpleEventBus(Number.class);
 bus.registerListener(Number.class, (Number n)-> {actual.set(n);});
 bus.executeEvent(expected);
 assertEquals(expected, actual.get());
}

This happens because in the getCopyOfEventHandlers only gets the handlers registered for runtime type of the executed event. It should get handlers registered for the runtime type or any super-type thereof.

To fix this; getCopyOfEventHandlers should be something like:

private Set<EventHandler> getEventHandlersFor(final Class<?> eventClass) {
 return eventMapping.entrySet().stream()
 .filter(entry -> entry.getKey().isAssignableFrom(eventClass))
 .flatMap(entry -> entry.getValue().stream())
 .collect(Collectors.toSet());
}

The second type problem is this:

When I register a handler for Numbers and it will run for any Numbers. Some might be Floats, others might be AtomicLongs. If I register a Consumer<AtomicInteger> to handle Numbers and pass in a Float, which is a Number, I will get a runtime exception.

@Test
public void shouldNotCompileButDoes() {
 EventBus bus = new SimpleEventBus(Number.class);
 // Not all Numbers are AtomicIntegers so this is a type error
 bus.registerListener(Number.class, 
 (AtomicInteger n)-> {System.out.println(n.get());});
 Number someNumber = 4f;
 bus.executeEvent(someNumber); // e.g. Float does not have .get()
}

Conversely if I register a Consumer<Object> to handle Numbers and pass in any Float, I will always run. In fact obviously a Consumer<Object> can accept any object.

@Test
public void shouldCompileButDoesNot() {
 EventBus bus = new SimpleEventBus(Number.class);
 // All Numbers are Objects this would work
// bus.registerListener(Number.class, 
// (Object o)-> {System.out.println(o);});
 Number someNumber = 4f;
 bus.executeEvent(someNumber);
}

To fix this; you should replace Consumer<? extends T> eventListener in registerListeners signature with Consumer<? super T>. In fact because the T only appears a parameter type in Consumer<T> declaration, a parameter of type Consumer<? extends T> should either be a type error or could be replaced by just Consumer<?>. I suspect latter might be the case for removeListener.

answered May 6, 2014 at 9:49
\$\endgroup\$
4
\$\begingroup\$

While synchronization is correct now, Collections.synchronizedSet(new HashSet<>())) is still a fairly coarse grained way to synchronize. You can replace this with Collections.newSetFromMap(new ConcurrentHashMap<>()) to benefit from the better locking strategy of ConcurrentHashMap.

Getting a value from a Map, or entering one if absent is made easier using lambdas in Java 8 using Map.computeIfAbsent() you can substitute :

eventMapping.putIfAbsent(eventClass, Collections.newSetFromMap(new ConcurrentHashMap<>()));
eventMapping.get(eventClass).add(eventHandler);

by

eventMapping.computeIfAbsent(eventClass, k -> Collections.newSetFromMap(new ConcurrentHashMap<>())).add(eventHandler);
answered May 5, 2014 at 22:29
\$\endgroup\$
1
  • \$\begingroup\$ Thanks a lot! This greatly clarified my code. There really should have been more focus on computeIfAbsent (in the docs/tutorials/highlights) as this makes using multimaps a lot easier. Also I decided to obtain a set as the following: private final static Supplier<Set<EventHandler>> CONCURRENT_SET_SUPPLIER = () -> Collections.newSetFromMap(new ConcurrentHashMap<>()); \$\endgroup\$ Commented May 7, 2014 at 8:44

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.