Event Bus (Java)

Event Bus (Java)

Originally conceived as a way to send messages to groups of actors, the EventBus has been generalized into a set of composable traits implementing a simple interface:

  • public boolean subscribe(S subscriber, C classifier) subscribes the given subscriber to events with the given classifier
  • public boolean unsubscribe(S subscriber, C classifier) undoes a specific subscription
  • public void unsubscribe(S subscriber) undoes all subscriptions for the given subscriber
  • public void publish(E event) publishes an event, which first is classified according to the specific bus (see Classifiers) and then published to all subscribers for the obtained classifier

This mechanism is used in different places within Akka, e.g. the DeathWatch and the Event Stream. Implementations can make use of the specific building blocks presented below.

An event bus must define the following three abstract types:

  • E is the type of all events published on that bus
  • S is the type of subscribers allowed to register on that event bus
  • C defines the classifier to be used in selecting subscribers for dispatching events

The traits below are still generic in these types, but they need to be defined for any concrete implementation.

Classifiers

The classifiers presented here are part of the Akka distribution, but rolling your own in case you do not find a perfect match is not difficult, check the implementation of the existing ones on github.

Lookup Classification

The simplest classification is just to extract an arbitrary classifier from each event and maintaining a set of subscribers for each possible classifier. This can be compared to tuning in on a radio station. The abstract class LookupEventBus is still generic in that it abstracts over how to compare subscribers and how exactly to classify. The necessary methods to be implemented are the following:

  • public C classify(E event) is used for extracting the classifier from the incoming events.
  • public int compareSubscribers(S a, S b) must define a partial order over the subscribers, expressed as expected from java.lang.Comparable.compare.
  • public void publish(E event, S subscriber) will be invoked for each event for all subscribers which registered themselves for the event’s classifier.
  • public int mapSize determines the initial size of the index data structure used internally (i.e. the expected number of different classifiers).

This classifier is efficient in case no subscribers exist for a particular event.

Subchannel Classification

If classifiers form a hierarchy and it is desired that subscription be possible not only at the leaf nodes, this classification may be just the right one. It can be compared to tuning in on (possibly multiple) radio channels by genre. This classification has been developed for the case where the classifier is just the JVM class of the event and subscribers may be interested in subscribing to all subclasses of a certain class, but it may be used with any classifier hierarchy. The abstract members needed by this classifier are

  • public Subclassification[C] subclassification provides an object providing isEqual(a: Classifier, b: Classifier) and isSubclass(a: Classifier, b: Classifier) to be consumed by the other methods of this classifier; this method is called on various occasions, it should be implemented so that it always returns the same object for performance reasons.
  • public C classify(E event) is used for extracting the classifier from the incoming events.
  • public void publish(E event, S subscriber) will be invoked for each event for all subscribers which registered themselves for the event’s classifier.

This classifier is also efficient in case no subscribers are found for an event, but it uses conventional locking to synchronize an internal classifier cache, hence it is not well-suited to use cases in which subscriptions change with very high frequency (keep in mind that “opening” a classifier by sending the first message will also have to re-check all previous subscriptions).

Scanning Classification

The previous classifier was built for multi-classifier subscriptions which are strictly hierarchical, this classifier is useful if there are overlapping classifiers which cover various parts of the event space without forming a hierarchy. It can be compared to tuning in on (possibly multiple) radio stations by geographical reachability (for old-school radio-wave transmission). The abstract members for this classifier are:

  • public int compareClassifiers(C a, C b) is needed for determining matching classifiers and storing them in an ordered collection.
  • public int compareSubscribers(S a, S b) is needed for storing subscribers in an ordered collection.
  • public boolean matches(C classifier, E event) determines whether a given classifier shall match a given event; it is invoked for each subscription for all received events, hence the name of the classifier.
  • public void publish(E event, S subscriber) will be invoked for each event for all subscribers which registered themselves for a classifier matching this event.

This classifier takes always a time which is proportional to the number of subscriptions, independent of how many actually match.

Actor Classification

This classification has been developed specifically for implementing DeathWatch: subscribers as well as classifiers are of type ActorRef. The abstract members are

  • public ActorRef classify(E event) is used for extracting the classifier from the incoming events.
  • public int mapSize determines the initial size of the index data structure used internally (i.e. the expected number of different classifiers).

This classifier is still is generic in the event type, and it is efficient for all use cases.

Event Stream

The event stream is the main event bus of each actor system: it is used for carrying log messages and Dead Letters and may be used by the user code for other purposes as well. It uses Subchannel Classification which enables registering to related sets of channels (as is used for RemoteLifeCycleMessage). The following example demonstrates how a simple subscription works. Given a simple actor:

import akka.actor.Props;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.actor.DeadLetter;
public static class DeadLetterActor extends UntypedActor {
  public void onReceive(Object message) {
    if (message instanceof DeadLetter) {
      System.out.println(message);
    }
  }
}

it can be subscribed like this:

final ActorSystem system = ActorSystem.create("DeadLetters");
final ActorRef actor = system.actorOf(new Props(DeadLetterActor.class));
system.eventStream().subscribe(actor, DeadLetter.class);

Default Handlers

Upon start-up the actor system creates and subscribes actors to the event stream for logging: these are the handlers which are configured for example in application.conf:

akka {
  event-handlers = ["akka.event.Logging$DefaultLogger"]
}

The handlers listed here by fully-qualified class name will be subscribed to all log event classes with priority higher than or equal to the configured log-level and their subscriptions are kept in sync when changing the log-level at runtime:

system.eventStream.setLogLevel(Logging.DebugLevel());

This means that log events for a level which will not be logged are typically not dispatched at all (unless manual subscriptions to the respective event class have been done)

Dead Letters

As described at Stopping actors, messages queued when an actor terminates or sent after its death are re-routed to the dead letter mailbox, which by default will publish the messages wrapped in DeadLetter. This wrapper holds the original sender, receiver and message of the envelope which was redirected.

Other Uses

The event stream is always there and ready to be used, just publish your own events (it accepts Object) and subscribe listeners to the corresponding JVM classes.

Contents