Distributed Publish Subscribe in Cluster

You are viewing the documentation for the new actor APIs, to view the Akka Classic documentation, see Classic Distributed Publish Subscribe.

Module info

The distributed publish subscribe topic API is available and usable with the core akka-actor-typed module, however it will only be distributed when used in a clustered application.

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

Additionally, add the dependency as below.

sbt
val AkkaVersion = "2.9.2"
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % AkkaVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-bom_${scala.binary.version}</artifactId>
      <version>2.9.2</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-cluster-typed_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.9.2")

  implementation "com.typesafe.akka:akka-cluster-typed_${versions.ScalaBinary}"
}

The Topic Registry

Distributed publish subscribe is achieved by representing each pub sub topic with an actor, akka.actor.typed.pubsub.Topicakka.actor.typed.pubsub.Topic.

The topic actor needs to run on each node where subscribers will live or that wants to publish messages to the topic.

Topics can be looked up in the PubSubPubSub registry, this way the same topic will be represented by the same actor for a whole actor system. If the topic has not yet been started it is started and returned, if it already is running, the existing ActorRef is returned.

Scala
sourceimport akka.actor.typed.pubsub.Topic
import akka.actor.typed.pubsub.PubSub

Behaviors.setup { context =>
  val pubSub = PubSub(context.system)

  val topic: ActorRef[Topic.Command[Message]] = pubSub.topic[Message]("my-topic")
Java
sourceimport akka.actor.typed.pubsub.PubSub;
import akka.actor.typed.pubsub.Topic;

import java.time.Duration;

Behaviors.setup(
    context -> {
      PubSub pubSub = PubSub.get(context.getSystem());

      ActorRef<Topic.Command<Message>> topic =
          pubSub.topic(Message.class, "my-topic");

The identity of the topic is the topic name and if it already has been started with a different type of message this will lead to an exception.

Local actors can then subscribe to the topic (and unsubscribe from it) via messages defined in TopicTopic:

Scala
sourcetopic ! Topic.Subscribe(subscriberActor)

topic ! Topic.Unsubscribe(subscriberActor)
Java
sourcetopic.tell(Topic.subscribe(subscriberActor));

topic.tell(Topic.unsubscribe(subscriberActor));

And publish messages to the topic:

Scala
sourcetopic ! Topic.Publish(Message("Hello Subscribers!"))
Java
sourcetopic.tell(Topic.publish(new Message("Hello Subscribers!")));

Messages published only be delivered to other nodes if the topic is started and there are any local subscribers registered with the topic there. The message is deduplicated so that even if there are multiple subscribers on a node, the message is only passed over the network to that node once.

It is possible to define a TTL (time to live) for the local topic actor, if it has no local subscribers or messages passing through for the given time period it stopped and removed from the registry:

Scala
sourceval topicWithTtl = pubSub.topic[Message]("my-topic", 3.minutes)
Java
sourceActorRef<Topic.Command<Message>> topicWithTtl =
    pubSub.topic(Message.class, "my-ttl-topic", Duration.ofMinutes(3));

The Topic Actor

The topic actor can also be started and managed manually. This means that multiple actors for the same topic can be started on the same node. Messages published to a topic on other cluster nodes will be sent between the nodes once per active topic actor that has any local subscribers:

Scala
sourceimport akka.actor.typed.pubsub.Topic

Behaviors.setup { context =>
  val topic = context.spawn(Topic[Message]("my-topic"), "MyTopic")
Java
sourceimport akka.actor.typed.pubsub.Topic;

import java.time.Duration;

Behaviors.setup(
    context -> {
      ActorRef<Topic.Command<Message>> topic =
          context.spawn(Topic.create(Message.class, "my-topic"), "MyTopic");

Pub Sub Scalability

Each topic is represented by one Receptionist service key meaning that the number of topics will scale to thousands or tens of thousands but for higher numbers of topics will require custom solutions. It also means that a very high turnaround of unique topics will not work well and for such use cases a custom solution is advised.

The topic actor acts as a proxy and delegates to the local subscribers handling deduplication so that a published message is only sent once to a node regardless of how many subscribers there are to the topic on that node.

When a topic actor has no subscribers for a topic it will deregister itself from the receptionist meaning published messages for the topic will not be sent to it.

Delivery Guarantee

As in Message Delivery Reliability of Akka, message delivery guarantee in distributed pub sub modes is at-most-once delivery. In other words, messages can be lost over the wire. In addition to that the registry of nodes which have subscribers is eventually consistent meaning that subscribing an actor on one node will have a short delay before it is known on other nodes and published to.

If you are looking for at-least-once delivery guarantee, we recommend Alpakka Kafka.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.