Distributed Publish Subscribe in Cluster

For the Akka Classic documentation of this feature see Classic Distributed Publish Subscribe. Classic Pub Sub can be used by leveraging the .toClassic adapters until #26338.

Module info

Until the new Distributed Publish Subscribe API, see #26338, you can use Classic Distributed Publish Subscribe coexisting with new Cluster and actors. To do this, add following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-tools" % "2.6.1+148-a614f0be"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-cluster-tools_2.12</artifactId>
  <version>2.6.1+148-a614f0be</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-cluster-tools_2.12', version: '2.6.1+148-a614f0be'
}

Add the new Cluster API if you don’t already have it in an existing Cluster application:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % "2.6.1+148-a614f0be"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-cluster-typed_2.12</artifactId>
  <version>2.6.1+148-a614f0be</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-cluster-typed_2.12', version: '2.6.1+148-a614f0be'
}
Project Info: Akka Cluster (typed)
Artifact
com.typesafe.akka
akka-cluster-typed
2.6.1+148-a614f0be
JDK versions
Adopt OpenJDK 8
Adopt OpenJDK 11
Scala versions2.12.10, 2.13.1
JPMS module nameakka.cluster.typed
License
Readiness level
Since 2.6.0, 2019-11-06
Home pagehttps://akka.io/
API documentation
Forums
Release notesakka.io blog
IssuesGithub issues
Sourceshttps://github.com/akka/akka

Sample project

Until #26338, this simple example shows how to use Classic Distributed Publish Subscribe with the new Cluster API.

The DistributedPubSub extension

The mediator can either be started and accessed with the akka.cluster.pubsub.DistributedPubSub extension as shown below, or started as an ordinary actor, see the full Akka Classic documentation Classic Distributed PubSub Extension.

Scala
val mediator = DistributedPubSub(context.system).mediator

Actors register to a topic for Pub-Sub mode, or register to a path for point-to-point mode.

Publish

Pub-Sub mode. For the full Akka Classic documentation of this feature see Classic Distributed PubSub Publish.

Subscribers

Subscriber actors can be started on several nodes in the cluster, and all will receive messages published to the “content” topic.

An actor that subscribes to a topic:

Scala
Behaviors.setup[DataEvent] { context =>
  import akka.actor.typed.scaladsl.adapter._

  mediator ! DistributedPubSubMediator.Subscribe(RegistrationTopic, context.self.toClassic)
  mediator ! DistributedPubSubMediator.Subscribe(IngestionTopic, context.self.toClassic)

  Behaviors.receiveMessagePartial {
    case dt: DataType if dt.key == key =>
      // do some capacity planning
      // allocate some shards
      // provision a source and sink
      // start a new ML stream...it's a data platform wonderland
      wonderland()

    case IngestionStarted(k, path) if k == key =>
      // simulate data sent from various data sources:
      (1 to 100).foreach { n =>
        mediator ! DistributedPubSubMediator.Send(
          path,
          msg = DataEnvelope(key, s"hello-$key-$n"),
          localAffinity = true)
      }
      andThen(key, mediator)

  }
}

Actors may also be subscribed to a named topic with a group id. For the full feature description see topic groups.

Publishers

Publishers publish messages to the topic from anywhere in the cluster. Messages are published by sending DistributedPubSubMediator.Publish message to the local mediator.

An actor that publishes to a topic:

Scala
  Behaviors.setup[AnyRef] { context =>
    import akka.cluster.pubsub.DistributedPubSub
    import akka.cluster.pubsub.DistributedPubSubMediator
    val mediator = DistributedPubSub(context.system).mediator

    var registry: Map[DataKey, DataType] = Map.empty

    def register(key: DataKey, schema: Schema): RegistrationStatus =
      registry.get(key) match {
        case Some(_) =>
          DataTypeExists(key)
        case None =>
          validate(schema) match {
            case Success(vs) =>
              val created = DataType(key, vs, 0)
              registry += (key -> created)

              mediator ! DistributedPubSubMediator.Publish(RegistrationTopic, created)
              RegistrationSuccess(created)
            case Failure(e) =>
              RegistrationFailure(key, e)
          }
      }

    def validate(schema: Schema): Try[Schema] = {
      Success(schema) // called, stubbed
    }

    Behaviors.receiveMessage {
      case RegisterDataType(key, schema, replyTo, onBehalfOf) =>
        val status = register(key, schema)
        replyTo ! RegistrationAck(status, onBehalfOf)
        Behaviors.same
      case _ =>
        Behaviors.unhandled
    }
  }
Behaviors.setup { context =>
  Behaviors.receiveMessagePartial[DataEvent] {
    case e: DataEnvelope if e.key == key =>
      // fanout to:
      // validate, slice, dice, re-route, store raw to blob, store pre-aggregated/timeseries to Cs*, etc.
      context.log.debug("Storing to {}.", sink)
      Behaviors.same

    case StopIngestion(k) if k == key =>
      mediator ! DistributedPubSubMediator.Publish(IngestionTopic, IngestionStopped(key))
      // cleanup and graceful shutdown
      Behaviors.stopped
  }
}

Send

Messages can be sent in point-to-point or broadcast mode. For the full Akka Classic documentation of this feature see Classic Distributed PubSub Send.

First, an actor must register a destination to send to:

Scala
Behaviors.setup { context =>
  // register to the path
  import akka.actor.typed.scaladsl.adapter._
  mediator ! DistributedPubSubMediator.Put(context.self.toClassic)

  idle(dt, mediator)
}

An actor that sends to a registered path:

Scala
// simulate data sent from various data sources:
(1 to 100).foreach { n =>
  mediator ! DistributedPubSubMediator.Send(
    path,
    msg = DataEnvelope(key, s"hello-$key-$n"),
    localAffinity = true)
}

Actors are automatically removed from the registry when they are terminated, or you can explicitly remove entries with DistributedPubSubMediator.Remove.

Delivery Guarantee

For the full Akka Classic documentation of this see Classic Distributed PubSub Delivery Guarantee.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.