This question is a followup from my previous question My EventBus system, and incorporates most points from @rolfl's answer.
It includes, but is not limited to:
- Usage of
Collections.synchronizedSet
over manualsynchronized { }
on trivial methods. - Minimal locking
- High performance code, but not going into micro-optimizations if it harms the readability of the code.
The code:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Event { }
public interface EventBus {
void registerListenersOfObject(final Object callbackObject);
<T> void registerListener(final Class<T> eventClass, final Consumer<? extends T> eventListener);
void executeEvent(final Object event);
void removeListenersOfObject(final Object callbackObject);
<T> void removeListener(final Class<T> eventClass, final Consumer<? extends T> eventListener);
void removeAllListenersOfEvent(final Class<?> eventClass);
void removeAllListeners();
}
public class SimpleEventBus implements EventBus {
private final static Set<EventHandler> EMPTY_SET = new HashSet<>();
private final static EventHandler[] EMPTY_ARRAY = new EventHandler[0];
private final ConcurrentMap<Class<?>, Set<EventHandler>> eventMapping = new ConcurrentHashMap<>();
private final Class<?> eventClassConstraint;
public SimpleEventBus() {
this(Object.class);
}
public SimpleEventBus(final Class<?> eventClassConstraint) {
this.eventClassConstraint = Objects.requireNonNull(eventClassConstraint);
}
@Override
public void registerListenersOfObject(final Object callbackObject) {
Arrays.stream(callbackObject.getClass().getMethods())
.filter(this::isEligibleMethod)
.forEach(method -> {
Class<?> eventClass = method.getParameterTypes()[0];
addEventHandler(eventClass, new MethodEventHandler(method, callbackObject, eventClass));
});
}
@Override
@SuppressWarnings("unchecked")
public <T> void registerListener(final Class<T> eventClass, final Consumer<? extends T> eventListener) {
Objects.requireNonNull(eventClass);
Objects.requireNonNull(eventListener);
if (eventClassConstraint.isAssignableFrom(eventClass)) {
addEventHandler(eventClass, new ConsumerEventHandler((Consumer<Object>)eventListener));
}
}
@Override
public void executeEvent(final Object event) {
if (eventClassConstraint.isAssignableFrom(event.getClass())) {
getCopyOfEventHandlers(event.getClass()).forEach(eventHandler -> eventHandler.invoke(event));
}
}
@Override
public void removeListenersOfObject(final Object callbackObject) {
Arrays.stream(callbackObject.getClass().getMethods())
.filter(this::isEligibleMethod)
.forEach(method -> {
Class<?> eventClass = method.getParameterTypes()[0];
removeEventHandler(eventClass, new MethodEventHandler(method, callbackObject, eventClass));
});
}
@Override
@SuppressWarnings("unchecked")
public <T> void removeListener(final Class<T> eventClass, final Consumer<? extends T> eventListener) {
Objects.requireNonNull(eventClass);
Objects.requireNonNull(eventListener);
if (eventClassConstraint.isAssignableFrom(eventClass)) {
removeEventHandler(eventClass, new ConsumerEventHandler((Consumer<Object>)eventListener));
}
}
@Override
public void removeAllListenersOfEvent(final Class<?> eventClass) {
eventMapping.remove(Objects.requireNonNull(eventClass));
}
@Override
public void removeAllListeners() {
eventMapping.clear();
}
private boolean isEligibleMethod(final Method method) {
return (method.getAnnotation(Event.class) != null
&& method.getReturnType().equals(void.class)
&& method.getParameterCount() == 1
&& eventClassConstraint.isAssignableFrom(method.getParameterTypes()[0]));
}
private void addEventHandler(final Class<?> eventClass, final EventHandler eventHandler) {
Objects.requireNonNull(eventClass);
Objects.requireNonNull(eventHandler);
eventMapping.putIfAbsent(eventClass, Collections.synchronizedSet(new HashSet<>()));
eventMapping.get(eventClass).add(eventHandler);
}
private void removeEventHandler(final Class<?> eventClass, final EventHandler eventHandler) {
Objects.requireNonNull(eventClass);
Objects.requireNonNull(eventHandler);
eventMapping.getOrDefault(eventClass, EMPTY_SET).remove(eventHandler);
}
private Stream<EventHandler> getCopyOfEventHandlers(final Class<?> eventClass) {
Set<EventHandler> eventHandlers = eventMapping.get(Objects.requireNonNull(eventClass));
return (eventHandlers == null)
? Stream.empty()
: Arrays.stream(eventHandlers.toArray(EMPTY_ARRAY));
}
private static interface EventHandler {
void invoke(final Object event);
}
private static class MethodEventHandler implements EventHandler {
private final Method method;
private final Object callbackObject;
private final Class<?> eventClass;
public MethodEventHandler(final Method method, final Object object, final Class<?> eventClass) {
this.method = Objects.requireNonNull(method);
this.callbackObject = Objects.requireNonNull(object);
this.eventClass = Objects.requireNonNull(eventClass);
}
@Override
public void invoke(final Object event) {
try {
method.setAccessible(true);
method.invoke(callbackObject, Objects.requireNonNull(event));
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new RuntimeException(ex);
}
}
@Override
public int hashCode() {
int hash = 7;
hash = 71 * hash + Objects.hashCode(this.method);
hash = 71 * hash + Objects.hashCode(this.callbackObject);
hash = 71 * hash + Objects.hashCode(this.eventClass);
return hash;
}
@Override
public boolean equals(final Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final MethodEventHandler other = (MethodEventHandler)obj;
if (!Objects.equals(this.method, other.method)) {
return false;
}
if (!Objects.equals(this.callbackObject, other.callbackObject)) {
return false;
}
if (!Objects.equals(this.eventClass, other.eventClass)) {
return false;
}
return true;
}
}
private static class ConsumerEventHandler implements EventHandler {
private final Consumer<Object> eventListener;
public ConsumerEventHandler(final Consumer<Object> consumer) {
this.eventListener = Objects.requireNonNull(consumer);
}
@Override
public void invoke(final Object event) {
eventListener.accept(Objects.requireNonNull(event));
}
@Override
public int hashCode() {
int hash = 5;
hash = 19 * hash + Objects.hashCode(this.eventListener);
return hash;
}
@Override
public boolean equals(final Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final ConsumerEventHandler other = (ConsumerEventHandler)obj;
if (!Objects.equals(this.eventListener, other.eventListener)) {
return false;
}
return true;
}
}
}
The unit tests of the old version are still valid and also still pass.
2 Answers 2
There are two bugs related to type system:
Currently if all three parameter types, registerListener
's first, and registered Consumer
's and executeEvent
's, it works as expected.
@Test
public void worksAsExpected() {
String expected = "WORKS";
AtomicReference<String> actual = new AtomicReference<>(null);
EventBus bus = new SimpleEventBus(String.class);
bus.registerListener(String.class, (String s)-> {actual.set(s);});
bus.executeEvent(expected);
assertEquals(expected, actual.get());
}
In the below case I register a listener for events of type Number
and execute and event of type Number
but it fails to execute the handler:
@Test
public void shouldHaveWorkedButDoesNot() {
Number expected = 1;
AtomicReference<Number> actual = new AtomicReference<>(null);
EventBus bus = new SimpleEventBus(Number.class);
bus.registerListener(Number.class, (Number n)-> {actual.set(n);});
bus.executeEvent(expected);
assertEquals(expected, actual.get());
}
This happens because in the getCopyOfEventHandlers
only gets the handlers registered for runtime type of the executed event
. It should get handlers registered for the runtime type or any super-type thereof.
To fix this; getCopyOfEventHandlers
should be something like:
private Set<EventHandler> getEventHandlersFor(final Class<?> eventClass) {
return eventMapping.entrySet().stream()
.filter(entry -> entry.getKey().isAssignableFrom(eventClass))
.flatMap(entry -> entry.getValue().stream())
.collect(Collectors.toSet());
}
The second type problem is this:
When I register a handler for Number
s and it will run for any Number
s. Some might be Float
s, others might be AtomicLong
s. If I register a Consumer<AtomicInteger>
to handle Number
s and pass in a Float
, which is a Number
, I will get a runtime exception.
@Test
public void shouldNotCompileButDoes() {
EventBus bus = new SimpleEventBus(Number.class);
// Not all Numbers are AtomicIntegers so this is a type error
bus.registerListener(Number.class,
(AtomicInteger n)-> {System.out.println(n.get());});
Number someNumber = 4f;
bus.executeEvent(someNumber); // e.g. Float does not have .get()
}
Conversely if I register a Consumer<Object>
to handle Number
s and pass in any Float
, I will always run. In fact obviously a Consumer<Object>
can accept
any object.
@Test
public void shouldCompileButDoesNot() {
EventBus bus = new SimpleEventBus(Number.class);
// All Numbers are Objects this would work
// bus.registerListener(Number.class,
// (Object o)-> {System.out.println(o);});
Number someNumber = 4f;
bus.executeEvent(someNumber);
}
To fix this; you should replace Consumer<? extends T> eventListener
in registerListener
s signature with Consumer<? super T>
. In fact because the T
only appears a parameter type in Consumer<T>
declaration, a parameter of type Consumer<? extends T>
should either be a type error or could be replaced by just Consumer<?>
. I suspect latter might be the case for removeListener
.
While synchronization is correct now, Collections.synchronizedSet(new HashSet<>()))
is still a fairly coarse grained way to synchronize. You can replace this with Collections.newSetFromMap(new ConcurrentHashMap<>())
to benefit from the better locking strategy of ConcurrentHashMap
.
Getting a value from a Map, or entering one if absent is made easier using lambdas in Java 8 using Map.computeIfAbsent()
you can substitute :
eventMapping.putIfAbsent(eventClass, Collections.newSetFromMap(new ConcurrentHashMap<>()));
eventMapping.get(eventClass).add(eventHandler);
by
eventMapping.computeIfAbsent(eventClass, k -> Collections.newSetFromMap(new ConcurrentHashMap<>())).add(eventHandler);
-
\$\begingroup\$ Thanks a lot! This greatly clarified my code. There really should have been more focus on
computeIfAbsent
(in the docs/tutorials/highlights) as this makes using multimaps a lot easier. Also I decided to obtain a set as the following:private final static Supplier<Set<EventHandler>> CONCURRENT_SET_SUPPLIER = () -> Collections.newSetFromMap(new ConcurrentHashMap<>());
\$\endgroup\$skiwi– skiwi2014年05月07日 08:44:09 +00:00Commented May 7, 2014 at 8:44
Explore related questions
See similar questions with these tags.