Distributed Publish Subscribe in Cluster
You are viewing the documentation for the new actor APIs, to view the Akka Classic documentation, see Classic Distributed Publish Subscribe.
Module info
The distributed publish subscribe topic API is available and usable with the core akka-actor-typed
module, however it will only be distributed when used in a clustered application.
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
Additionally, add the dependency as below.
- sbt
val AkkaVersion = "2.9.5" libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % AkkaVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-bom_${scala.binary.version}</artifactId> <version>2.9.5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-typed_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.9.5") implementation "com.typesafe.akka:akka-cluster-typed_${versions.ScalaBinary}" }
The Topic Registry
Distributed publish subscribe is achieved by representing each pub sub topic with an actor, akka.actor.typed.pubsub.Topic
akka.actor.typed.pubsub.Topic
.
The topic actor needs to run on each node where subscribers will live or that wants to publish messages to the topic.
Topics can be looked up in the PubSub
PubSub
registry, this way the same topic will be represented by the same actor for a whole actor system. If the topic has not yet been started it is started and returned, if it already is running, the existing ActorRef
is returned.
- Scala
-
source
import akka.actor.typed.pubsub.Topic import akka.actor.typed.pubsub.PubSub Behaviors.setup { context => val pubSub = PubSub(context.system) val topic: ActorRef[Topic.Command[Message]] = pubSub.topic[Message]("my-topic")
- Java
-
source
import akka.actor.typed.pubsub.PubSub; import akka.actor.typed.pubsub.Topic; import java.time.Duration; Behaviors.setup( context -> { PubSub pubSub = PubSub.get(context.getSystem()); ActorRef<Topic.Command<Message>> topic = pubSub.topic(Message.class, "my-topic");
The identity of the topic is the topic name and if it already has been started with a different type of message this will lead to an exception.
Local actors can then subscribe to the topic (and unsubscribe from it) via messages defined in Topic
Topic
:
- Scala
-
source
topic ! Topic.Subscribe(subscriberActor) topic ! Topic.Unsubscribe(subscriberActor)
- Java
-
source
topic.tell(Topic.subscribe(subscriberActor)); topic.tell(Topic.unsubscribe(subscriberActor));
And publish messages to the topic:
- Scala
-
source
topic ! Topic.Publish(Message("Hello Subscribers!"))
- Java
-
source
topic.tell(Topic.publish(new Message("Hello Subscribers!")));
Messages published only be delivered to other nodes if the topic is started and there are any local subscribers registered with the topic there. The message is deduplicated so that even if there are multiple subscribers on a node, the message is only passed over the network to that node once.
It is possible to define a TTL (time to live) for the local topic actor, if it has no local subscribers or messages passing through for the given time period it stopped and removed from the registry:
- Scala
-
source
val topicWithTtl = pubSub.topic[Message]("my-topic", 3.minutes)
- Java
-
source
ActorRef<Topic.Command<Message>> topicWithTtl = pubSub.topic(Message.class, "my-ttl-topic", Duration.ofMinutes(3));
The Topic Actor
The topic actor can also be started and managed manually. This means that multiple actors for the same topic can be started on the same node. Messages published to a topic on other cluster nodes will be sent between the nodes once per active topic actor that has any local subscribers:
- Scala
-
source
import akka.actor.typed.pubsub.Topic Behaviors.setup { context => val topic = context.spawn(Topic[Message]("my-topic"), "MyTopic")
- Java
-
source
import akka.actor.typed.pubsub.Topic; import java.time.Duration; Behaviors.setup( context -> { ActorRef<Topic.Command<Message>> topic = context.spawn(Topic.create(Message.class, "my-topic"), "MyTopic");
Pub Sub Scalability
Each topic is represented by one Receptionist service key meaning that the number of topics will scale to thousands or tens of thousands but for higher numbers of topics will require custom solutions. It also means that a very high turnaround of unique topics will not work well and for such use cases a custom solution is advised.
The topic actor acts as a proxy and delegates to the local subscribers handling deduplication so that a published message is only sent once to a node regardless of how many subscribers there are to the topic on that node.
When a topic actor has no subscribers for a topic it will deregister itself from the receptionist meaning published messages for the topic will not be sent to it.
Delivery Guarantee
As in Message Delivery Reliability of Akka, message delivery guarantee in distributed pub sub modes is at-most-once delivery. In other words, messages can be lost over the wire. In addition to that the registry of nodes which have subscribers is eventually consistent meaning that subscribing an actor on one node will have a short delay before it is known on other nodes and published to.
If you are looking for at-least-once delivery guarantee, we recommend Alpakka Kafka.