Azure Event Hubs

Azure Event Hubs

Azure Event Hubs is a real-time data streaming platform which is proprietary to the Microsoft Azure cloud.

While Event Hubs supports some Apache Kafka APIs, we do not recommend using Alpakka Kafka. Our experience has been that it is easy to find oneself using Kafka functionality with Alpakka Kafka which is not fully supported by Event Hubs.

For information about setting up and managing Event Hubs infrastructure, we refer you to the official documentation.

This connector uses the native/AMQP protocol to interact with Event Hubs.

A note about the nomenclature: “Event Hubs” (plural) is the name of the product. The analogue to a Kafka topic is the singular hub. This documentation uses “Event Hubs hub” to refer to a singular hub and “Event Hubs topics” to refer to the plural (in the interest of not having “Event Hubs hubs”).

The Azure Event Hubs connector provides and Akka Stream source to consume from and flows/sinks to produce and checkpoint to Azure Event Hubs topics. It is based on the official Microsoft Azure Java SDK.

Project Info: Azure Event Hubs
Artifact
com.lightbend.akka
akka-stream-azure-eventhubs
2.0.0
JDK versions
Adopt OpenJDK 11
Scala versions2.13.11
JPMS module nameakka.stream.alpakka.azure.eventhubs
License
Readiness level
Since 2.0, 2023-09-04
Home pagehttps://doc.akka.io/libraries/akka-enhancements/current/index.html
API documentation
Forums
IssuesIssue tracker
Sourceshttps://github.com/lightbend/akka-enhancements

Artifacts

These artifacts are available on request from Lightbend, please contact your sales representative.

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.8.4"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-azure-eventhubs" % "2.0.0",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.8.4</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-azure-eventhubs_${scala.binary.version}</artifactId>
    <version>2.0.0</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.8.4",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-azure-eventhubs_${versions.ScalaBinary}:2.0.0"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows the direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.azureazure-messaging-eventhubs5.16.1
com.typesafe.akkaakka-actor-typed_2.132.8.4
com.typesafe.akkaakka-stream_2.132.8.4
org.scala-langscala-library2.13.11
Dependency tree
com.azure    azure-messaging-eventhubs    5.16.1
    com.azure    azure-core-amqp    2.8.11    The MIT License (MIT)
        com.azure    azure-core    1.44.1    The MIT License (MIT)
            com.azure    azure-json    1.1.0    The MIT License (MIT)
            com.fasterxml.jackson.core    jackson-annotations    2.13.5    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-core    2.13.5    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-databind    2.13.5    The Apache Software License, Version 2.0
                com.fasterxml.jackson.core    jackson-annotations    2.13.5    The Apache Software License, Version 2.0
                com.fasterxml.jackson.core    jackson-core    2.13.5    The Apache Software License, Version 2.0
            com.fasterxml.jackson.datatype    jackson-datatype-jsr310    2.13.5
                com.fasterxml.jackson.core    jackson-annotations    2.13.5    The Apache Software License, Version 2.0
                com.fasterxml.jackson.core    jackson-core    2.13.5    The Apache Software License, Version 2.0
                com.fasterxml.jackson.core    jackson-databind    2.13.5    The Apache Software License, Version 2.0
                    com.fasterxml.jackson.core    jackson-annotations    2.13.5    The Apache Software License, Version 2.0
                    com.fasterxml.jackson.core    jackson-core    2.13.5    The Apache Software License, Version 2.0
            io.projectreactor    reactor-core    3.4.33    Apache License, Version 2.0
                org.reactivestreams    reactive-streams    1.0.4    MIT-0
            org.slf4j    slf4j-api    1.7.36
        com.microsoft.azure    qpid-proton-j-extensions    1.2.4    The MIT License (MIT)
            org.apache.qpid    proton-j    0.33.8
            org.slf4j    slf4j-api    1.7.36
        org.apache.qpid    proton-j    0.33.8
    com.azure    azure-core    1.44.1    The MIT License (MIT)
        com.azure    azure-json    1.1.0    The MIT License (MIT)
        com.fasterxml.jackson.core    jackson-annotations    2.13.5    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-core    2.13.5    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-databind    2.13.5    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-annotations    2.13.5    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-core    2.13.5    The Apache Software License, Version 2.0
        com.fasterxml.jackson.datatype    jackson-datatype-jsr310    2.13.5
            com.fasterxml.jackson.core    jackson-annotations    2.13.5    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-core    2.13.5    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-databind    2.13.5    The Apache Software License, Version 2.0
                com.fasterxml.jackson.core    jackson-annotations    2.13.5    The Apache Software License, Version 2.0
                com.fasterxml.jackson.core    jackson-core    2.13.5    The Apache Software License, Version 2.0
        io.projectreactor    reactor-core    3.4.33    Apache License, Version 2.0
            org.reactivestreams    reactive-streams    1.0.4    MIT-0
        org.slf4j    slf4j-api    1.7.36
com.typesafe.akka    akka-actor-typed_2.13    2.8.4    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.8.4    BUSL-1.1
        com.typesafe    config    1.4.2    Apache-2.0
        org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
            org.scala-lang    scala-library    2.13.11    Apache-2.0
        org.scala-lang    scala-library    2.13.11    Apache-2.0
    com.typesafe.akka    akka-slf4j_2.13    2.8.4    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.8.4    BUSL-1.1
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.11    Apache-2.0
            org.scala-lang    scala-library    2.13.11    Apache-2.0
        org.scala-lang    scala-library    2.13.11    Apache-2.0
        org.slf4j    slf4j-api    1.7.36
    org.scala-lang    scala-library    2.13.11    Apache-2.0
    org.slf4j    slf4j-api    1.7.36
com.typesafe.akka    akka-stream_2.13    2.8.4    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.8.4    BUSL-1.1
        com.typesafe    config    1.4.2    Apache-2.0
        org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
            org.scala-lang    scala-library    2.13.11    Apache-2.0
        org.scala-lang    scala-library    2.13.11    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.8.4    BUSL-1.1
    com.typesafe    ssl-config-core_2.13    0.6.1    Apache-2.0
        com.typesafe    config    1.4.2    Apache-2.0
        org.scala-lang    scala-library    2.13.11    Apache-2.0
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.11    Apache-2.0
org.scala-lang    scala-library    2.13.11    Apache-2.0

The code snippets in this document assume the following imports:

Scala
sourceimport akka.stream.alpakka.azure.eventhubs._
import akka.stream.alpakka.azure.eventhubs.scaladsl._
import akka.stream.scaladsl._
import com.azure.messaging.eventhubs._
import scala.concurrent.duration._
Java
sourceimport akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.*;
import akka.stream.alpakka.azure.eventhubs.*;
import akka.stream.alpakka.azure.eventhubs.javadsl.*;
import akka.stream.javadsl.*;
import com.azure.messaging.eventhubs.*;

Connecting to Event Hubs

The stages in this connector defer connecting to and authentication/authorization for Event Hubs to the underlying client’s builders (e.g. EventHubClientBuilder and EventProcessorClientBuilder). As a developer convenience, if connection strings (embedding the appropriate auth tokens) obtained from the Azure Event Hubs UI are being used, the ClientFromConfigClientFromConfig can construct client instances from a Config object formatted as:

reference.conf (HOCON)
sourcealpakka.azure.eventhubs.eventhub {
  # The connection string for the event hub
  # (see https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send?tabs=connection-string%2Croles-azure-portal#get-the-connection-string)
  connection-string = ""

  # The name of the particular event hub
  hub-name = ""
}

If not using connection strings, programmatic construction must be used.

Producing to Event Hubs

The producer flows produce to exactly one hub, which is the one to which the provided EventHubProducerAsyncClient has been bound. An EventHubProducerAsyncClient may be constructed programmatically (using an EventHubClientBuilder) or from Config (see “Connecting to Event Hubs”).

The flows consume events wrapped in envelopes constructed using the methods in ProducerMessageProducerMessage. The envelopes may contain a pass-through value which the producer flow will emit downstream after successfully publishing the event to the hub. This can be used to ensure at-least-once publishing to Event Hubs (e.g. by passing through a PromiseCompletableFuture to be completed, an offset/sequence number to be committed, or an ActorRef to send an acknowledgement to).

Partitioning

Event Hubs topics have 1 or more partitions. The envelopes describe how the event(s) within the envelope should be routed to partitions. The choice of partitioning has implications for ordering/consistency and availability.

  • ProducerMessage.roundRobinPartitioning (default): randomly choose a partition to which the underlying client is able to connect. Use of this partitioning will not guarantee the ordering of produced messages (it will be up to consumers who care about ordering to reconstruct order from the published events). This partitioning is the default in order to match the defaults of the underlying client, but one of the other partitionings may be easier to reason about in many cases.

  • ProducerMessage.explicitPartitioning: explicitly routes events to a specific partition. It is up to the application code to interrogate Event Hubs to obtain valid partition IDs. The advantage of this partitioning over partitionByKey is that it allows the partition used to be stable as partitions are added to the hub, at the cost of making determining which partition to use an application concern.

  • ProducerMessage.partitionByKey: the underlying client will hash the provided string and select a partition based on that hash. As long as partitions are not added, the selected partition for a given key will be stable, however if partitions are added, the selected partition will likely change.

Producing multiple events

Zero or more events may be placed into an envelope.

  • ProducerMessage.empty: An envelope with no events. This is perhaps most useful for carrying along the pass-through; it may also be useful as a starting point to progressively fill an envelope through multiple processing stages.

  • ProducerMessage.single: An envelope with a single event.

  • ProducerMessage.multi: An envelope with multiple events (in order). The producer flow may perform multiple produce operations to produce the events and these operations may be retried independently. If an event A is successfully produced, all events preceding A were successfully produced at least once, but this does not preclude such a preceding event from also being produced again (due to retry).

  • ProducerMessage.batch: An envelope with events which will be produced atomically, in one produce operation (either all events were produced or none were). The total size of the events plus Event Hubs metadata must not exceed the maximum size allowed by the topic. Note that the stage does not perform any deduplication: in the presence of retries and restarts, a batch could be produced multiple times.

It is possible to add events to an existing envelope, though care must be taken to not exceed batch-size limits for the Event Hubs topic being produced to. For batch envelopes, the batch-size limit applies to the total size of the events, plus Event Hubs metadata while for multi (and single) envelopes, the limit applies to individual events (plus the metadata associated with producing a single event). The batch-size limit is obtained by the underlying client based on the provisioned tier for the hub. It is generally advised in event-driven systems to keep one’s events “fairly small”: if events are no larger than 1 KiB, batches of hundreds of events are allowed.

All events in a multi envelope will share the partitioning. If the partitioning is roundRobinPartitioning, each individual produce operation will be independently routed (that is to say, the combination of multi and roundRobinPartitioning will not guarantee that all events in the envelope go to the same partition; for envelopes with many events, this does mean that the success of the envelope will not depend on the availability of any particular partition).

As events in a batch envelope are produced in a single operation, they will be routed to the same partition, though in a situation where an attempted operation is incorrectly reported by the underlying client to have failed (e.g. a network partition resulted in the acknowledgement from the target partition not being received) and the roundRobinPartitioning is being used, the retry may successfully produce to a different partition.

If producing multiple events as a semantic unit (e.g. if projecting events from Akka Persistence where such events have a correlation ID to the command which gave rise to them) and one is certain that the events will fit inside a batch, it is likely best to produce them via a batch envelope to preserve the semantic meaning. Conversely, if batching is employed purely as a performance optimization, multi should be considered.

The producer flows do not automatically coalesce envelopes into batches. Such functionality should be implemented by the application.

Producer settings

The behavior of a producer flow may be customized via ProducerSettingsProducerSettings, which may be obtained from config or programmatically built.

The defaults used when constructing from config are

reference.conf (HOCON)
sourcealpakka.azure.eventhubs.producer {
  # Maximum number of produce attempts in-flight at any time, without regard to
  #   partitioning
  max-parallelism = 100

  # Maximum number of produce attempts in-flight for a given (potentially logical) partition.
  # 
  # Setting to greater than 1 may result in produced events being out-of-order.
  #
  # Event Hubs supports three partitioning strategies, which in this connector may
  #  be set per-event or per-batch, though it is probably a good idea to pick one
  #  strategy and stick to it for a given producer stage
  #
  # * round-robin: events are sent to any available partition.  Since this
  #     strategy intrinsically does not guarantee ordering of events, it is
  #     recommended to set produces-per-partition to the same value as max-parallelism if
  #     no other partitioning strategy is being used with this producer
  #
  # * by key: the partitioning key is hashed and assigned to a consistent
  #     physical partition (assuming no physical partitions are added or removed).
  #     If produces-per-partition is 1, then ordering of events for a given key is
  #     guaranteed in the absence of a change in the number of physical partitions.
  #     Events with different partitioning keys observe no ordering guarantee,
  #     even if the respective partitioning keys end up assigned to the same
  #     physical partition.  It is highly recommended if using this strategy
  #     to leave produces-per-partition at the default 1.
  #
  # * explicit: the event is assigned to a specific physical partition.  Increasing
  #     this setting from the default 1 may increase throughput but may reorder
  #     events.
  produces-per-partition = 1

  # Whether a producer stage should close the underlying client when stopping.
  #   If sharing the underlying client, this should be set to false
  close-producer-on-stop = true

  # Produce attempts may be automatically retried with an exponential backoff
  retry {
    max-attempts = 1
    min-backoff = 100ms
    max-backoff = 10s
  }
}

When constructing ProducerSettings from an ActorSystem, the config section alpakka.azure.eventhubs.producer is used. ProducerSettings may also be constructed from another config object with the same layout as above.

Producer.flow

In this example, we convert an application type into a ProducerMessage. Since it is desirable to preserve ordering per-subject, we partition based on the subject of the events and ensure that produces-per-partition is 1.

Scala
source// A minimal class demonstrating an "application level" view of an event
case class Event(subject: String, payload: Array[Byte])

val producerClient: EventHubProducerAsyncClient =
  ClientFromConfig.producer(producerConfig)

val producerSettings = ProducerSettings(actorSystem).withProducesPerPartition(1)

val flow: Flow[Event, String, NotUsed] =
  Flow[Event]
    .map { event =>
      ProducerMessage.single(
        new EventData(event.payload),
        event.subject,
        ProducerMessage.partitionByKey(event.subject))
    }
    .via(Producer.flow(producerSettings, producerClient))
    .wireTap(subject => actorSystem.log.debug("Published event for subject [{}]", subject))
Java
sourcepublic class Event {
  public final String subject;
  public final byte[] payload;

  public Event(String subject, byte[] payload) {
    this.subject = subject;
    this.payload = payload;
  }
}

EventHubProducerAsyncClient producerClient = ClientFromConfig.producer(producerConfig);

ProducerSettings producerSettings = ProducerSettings.create(actorSystem).withProducesPerPartition(1);

Flow<Event, String, NotUsed> flow =
  Flow.of(Event.class)
    .map(event ->
      ProducerMessage.single(
        new EventData(event.payload),
        event.subject,
        ProducerMessage.partitionByKey(event.subject))
    )
    .via(Producer.flow(producerSettings, producerClient))
    .wireTap(subject -> actorSystem.log().debug("Published event for subject [{}]", subject));

Producer.flowWithContext

This example is similar, but instead of a pass-through, an Offset (e.g. using Akka Projection to obtain events from event-sourced actors) is passed as context so that our progress can be committed (thus ensuring at-least-once production).

Scala
source// see Producer.flow documentation for Event class

val producerClient: EventHubProducerAsyncClient =
  ClientFromConfig.producer(producerConfig)

val producerSettings = ProducerSettings(actorSystem).withProducesPerPartition(1)

val flow: FlowWithContext[Event, Offset, NotUsed, Offset, NotUsed] =
  FlowWithContext[Event, Offset]
    .map { event =>
      ProducerMessage.singleWithPartitioning(
        new EventData(event.payload),
        ProducerMessage.partitionByKey(event.subject))
    }
    .via(Producer.flowWithContext(producerSettings, producerClient))
Java
source// see Producer.flow documentation for Event class

EventHubProducerAsyncClient producerClient = ClientFromConfig.producer(producerConfig);

ProducerSettings producerSettings = ProducerSettings.create(actorSystem).withProducesPerPartition(1);

FlowWithContext<Event, Offset, NotUsed, Offset, NotUsed> flow =
  FlowWithContext.<Event, Offset>create()
    .map(event ->
      ProducerMessage.singleWithPartitioning(
        new EventData(event.payload),
        ProducerMessage.partitionByKey(event.subject))
    )
    .via(Producer.flowWithContext(producerSettings, producerClient));

Note that it is possible to carry a per-event pass-through in addition to propagating context.

Consuming from Event Hubs

The consumer sources subscribe to Event Hubs topics and feed events to an Akka Stream.

Each of the sources, when materialized, uses the provided EventProcessorClientBuilder (see above) to build a client to consume from a hub; the underlying client responds to partition rebalances and this client manages checkpointing with the provided CheckpointSettingsCheckpointSettings via the provided CheckpointStore.

Choosing a source

These factory methods are part of the ConsumerConsumer API.

Factory method Element type Notes
sourceWithCheckpointableContext EventData Checkpointable is passed as context
source user-specified Uses a provided function to wrap EventData and Checkpointable into an application-specific type
pairSource (EventData, Checkpointable)Pair<EventData, Checkpointable>

The sources materialize to a Consumer.ControlConsumer.Control instance. The pattern for a “controlled shutdown” of a consumer is:

  • Call stop() on the control instance. This will cause the source to stop emitting events downstream and the source will complete. The consumption infrastructure will remain available in order to handle future checkpoint attempts.
  • Wait for the entire stream to complete processing, to ensure that a checkpoint attempt is made for all processed events.
  • Call shutdown() on the control instance. This signals the consumption infrastructure that no more checkpoint attempts will be made. The infrastructure will shutdown when those attempts have completed.

Typically, the stream completion will be signaled via a Future[Done]CompletionStage<Done>: the checkpointing sinks and Sink.ignore are examples of stages which materialize as such a future. If stream completion is signaled by such a future, the drainAndShutdown method on the control encapsulates this pattern.

Consumer settings

The behavior of the consumer sources may be customized via ConsumerSettingsConsumerSettings, which may be obtained from config or programmatically built.

The defaults used when constructing from config are

reference.conf (HOCON)
sourcealpakka.azure.eventhubs.consumer {
  # The consumer group to use.  Note that Event Hubs requires consumer groups to
  # be pre-allocated on the hub.  This must be set.
  consumer-group = ""

  # When consuming events, the client will try to accumulate a batch of at most this
  # size.  Batching is per-partition.
  batch-size = 1000

  # A batch from a partition will become available after this time period, whether
  # or not it has 'batch-size' events.  A batch becomes available before this interval
  # has passed (and when not backpressured) a new interval will start: accordingly
  # if partitions have a sufficient backlog of events, consumption rates per-partition may
  # substantially exceed 'batch-size' times 'batch-interval' 
  batch-interval = 1s

  # Advisory capacity for buffers in the stage.  When there are more than this many
  # events in the main buffer, all partitions consumed from will backpressure.  Must be
  # at least 'batch-size'.  The less this is relative to 'batch-size' times the number of
  # partitions assigned at a given time to a consumer, the more time partitions will spend
  # backpressured.
  buffer-size = 1000

  # If non-empty, this identifier is used to identify the consumer stage in logs; it
  # is useful if starting multiple consumers in the same process.
  consumer-id = ""

  # If no checkpoint is found and no initial position was programmatically specified,
  # whether to consume from the 'earliest' position, 'latest' position, or the underlying
  # consumer's 'default' (which as of Azure SDK 5.15.6 is latest).
  #
  # Valid values: 'earliest', 'latest', 'default', or empty (equivalent to default)
  fallback-position = default

  # Which strategy to use for load-balancing partitions between consumers of the same
  # Event Hub in the same group using the same checkpoint store.
  #
  # Valid values:
  #   * 'balanced': each consumer acquires one partition at a time until all partitions
  #       are acquired and each consumer owns an approximately equal number of partitions.
  #       This reduces rebalances in response to membership changes, but at startup will
  #       delay the point where all partitions are being consumed.
  #   * 'greedy': each consumer attempts to acquire any unclaimed partitions.  Compared
  #       to the 'balanced' strategy, this will have a faster time to all partitions
  #       being consumed and a faster response to membership changes, but it will result
  #       in more rebalances.
  #   * 'default' or empty: Use the underlying consumer's default strategy (which as of
  #       Azure SDK 5.15.6 is the balanced strategy)
  load-balancing-strategy = default

  # How frequently this consumer will check that partition ownership is balanced and apply the
  # load balancing strategy (see 'load-balancing-strategy').  If empty, use the
  # underlying consumer's default
  load-balancing-interval = ""

  # This consumer will consider partition claims which have not been renewed within this interval
  # to have expired and the partition to be claimable.  It will take at least this amount of
  # time for any partitions owned by a failed or stopped consumer to be reclaimed.  If empty,
  # the underlying client's default applies.
  partition-ownership-interval = ""
}

The sources apply these settings to the EventProcessorClientBuilder. The builder is (as of this writing) mutable and each setting is write-once. Accordingly, the following settings should not be set on the EventProcessorClientBuilder provided to a consumer source:

EventProcessorClientBuilder setting Alternative to setting on the EventProcessorClientBuilder
consumerGroup Set via ConsumerSettings
checkpointStore Provide separately to source
initialPartitionEventPosition Set via ConsumerSettings
loadBalancingStrategy Set via ConsumerSettings
loadBalancingUpdateInterval Set via ConsumerSettings
partitionOwnershipExpirationInterval Set via ConsumerSettings
processError Must not be set
processEvent Must not be set
processEventBatch Must not be set
processPartitionClose Must not be set
processPartitionInitialization Must not be set

The EventProcessorClientBuilder should not be reused: each source should be provided a new instance for every run or restart.

Consumer.sourceWithCheckpointableContext

In this example, we consume from our hub and transform the events into an application representation (following the same convention of using the subject of the event as the partitioning key as in the producer examples above) before handing them to a businessLogicFlow and checkpointing when the business logic is complete.

Scala
source// A minimal class demonstrating an "application level" view of an event
case class Event(subject: String, payload: Array[Byte])

def parseEvent(bytes: Array[Byte]): Event = ???

val checkpointSettings =
  CheckpointSettings(
    checkpointTimeout = 30.seconds,
    maxBatch = 1000,
    maxInterval = 1.second,
    partitions = 1,
    maxInflight = 1)

// bring your own implementation of buildCheckpointStore
val checkpointStore = buildCheckpointStore(checkpointConfig)

val eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig)
val consumerSettings = ConsumerSettings(actorSystem).withConsumerGroup("eventhubs-example")

val source: SourceWithContext[Event, Checkpointable, Consumer.Control] =
  Consumer
    .sourceWithCheckpointableContext(
      consumerSettings,
      eventProcessorBuilder,
      checkpointSettings,
      checkpointStore)
    .map { eventData =>
      Event(eventData.getPartitionKey, eventData.getBody)
    }

val (control, streamDone) =
  source
    .via(businessLogicFlow) // Important: businessLogicFlow has Checkpointable as output context
    .toMat(Checkpointer.sinkWithCheckpointableContext(checkpointSettings))(Keep.both)
    .run()

// The stream will consume until there is a failure or the business logic completes the stream.
// To stop it cleanly:
control.drainAndShutdown(streamDone)
Java
sourcepublic class Event {
  public final String subject;
  public final byte[] payload;

  public Event(String subject, byte[] payload) {
    this.subject = subject;
    this.payload = payload;
  }
}

private static Event parseEvent(byte[] bytes) { throw new scala.NotImplementedError(); }
// bring your own implementation of buildCheckpointStore
CheckpointStore checkpointStore = buildCheckpointStore(checkpointConfig);

EventProcessorClientBuilder eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig);
ConsumerSettings consumerSettings = ConsumerSettings.create(actorSystem).withConsumerGroup("eventhubs-example");

SourceWithContext<Event, Checkpointable, Consumer.Control> source =
  Consumer.sourceWithCheckpointableContext(
      consumerSettings,
      eventProcessorBuilder,
      checkpointSettings,
      checkpointStore)
    .map(eventData -> new Event(eventData.getPartitionKey(), eventData.getBody()));

Pair<Consumer.Control, CompletionStage<Done>> controlAndStreamCompletion =
  source.via(businessLogicFlow)
    .toMat(Checkpointer.sinkWithCheckpointableContext(checkpointSettings), Keep.both())
    .run(actorSystem);

// The stream will consume until there is a failure or the business logic completes the stream.
// To stop it cleanly:
controlAndStreamCompletion.first().
  drainAndShutdown(
    controlAndStreamCompletion.second(),
    actorSystem.dispatcher());

Consumer.source

This example is essentially the same as above: the difference here is that the application representation includes the Checkpointable rather than relying on the stream to pass it alongside as context. The business logic flow emits the Checkpointable when done processing. It is advised to prefer sourceWithCheckpointableContext, though this variant does not constrain as much as FlowWithContext.

Scala
source// bring your own implementation of buildCheckpointStore
val checkpointStore = buildCheckpointStore(checkpointConfig)

val eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig)
val consumerSettings = ConsumerSettings(actorSystem).withConsumerGroup("eventhubs-example")

val source: Source[DomainType, Consumer.Control] =
  Consumer.source(consumerSettings, eventProcessorBuilder, checkpointSettings, checkpointStore) {
    (eventData, checkpointable) => DomainType.fromEventHubs(eventData, checkpointable)
  }

val (control, streamDone) =
  source
    .via(businessLogicFlow) // Important: businessLogicFlow outputs Checkpointable
    .toMat(Checkpointer.sink(checkpointSettings))(Keep.both)
    .run()

// The stream will consume until there is a failure or the business logic completes the stream.
// To stop it cleanly:
control.drainAndShutdown(streamDone)
Java
sourcestatic DomainType fromEventHubs(EventData eventData, Checkpointable checkpointable) {
  return new DomainType(eventData.getPartitionKey(), eventData.getBody(), checkpointable);
}
// bring your own implementation of buildCheckpointStore
CheckpointStore checkpointStore = buildCheckpointStore(checkpointConfig);

EventProcessorClientBuilder eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig);
ConsumerSettings consumerSettings = ConsumerSettings.create(actorSystem).withConsumerGroup("eventhubs-example");

Source<DomainType, Consumer.Control> source =
  Consumer.source(
    consumerSettings,
    eventProcessorBuilder,
    checkpointSettings,
    checkpointStore,
    DomainType::fromEventHubs);

Pair<Consumer.Control, CompletionStage<Done>> controlAndStreamCompletion =
  source.via(businessLogicFlow)
    .toMat(Checkpointer.sink(checkpointSettings), Keep.both())
    .run(actorSystem);

// The stream will consume until there is a failure or the business logic completes the stream.
// To stop it cleanly:
controlAndStreamCompletion.first()
  .drainAndShutdown(
    controlAndStreamCompletion.second(),
    actorSystem.dispatcher());

Consumer.pairSource

This example is essentially the same as above: the difference here is that there’s no application representation as we operate on pairs of EventData and Checkpointable.

Scala
source// bring your own implementation of buildCheckpointStore
val checkpointStore = buildCheckpointStore(checkpointConfig)

val eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig)
val consumerSettings = ConsumerSettings(actorSystem).withConsumerGroup("eventhubs-example")

val source: Source[(EventData, Checkpointable), Consumer.Control] =
  Consumer.pairSource(consumerSettings, eventProcessorBuilder, checkpointSettings, checkpointStore)

val (control, streamDone) =
  source
    .via(businessLogicFlow) // Important: businessLogicFlow outputs Checkpointable
    .toMat(Checkpointer.sink(checkpointSettings))(Keep.both)
    .run()

// The stream will consume until there is a failure or the business logic completes the stream.
// To stop it cleanly:
control.drainAndShutdown(streamDone)
Java
source// bring your own implementation of buildCheckpointStore
CheckpointStore checkpointStore = buildCheckpointStore(checkpointConfig);

EventProcessorClientBuilder eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig);
ConsumerSettings consumerSettings = ConsumerSettings.create(actorSystem).withConsumerGroup("eventhubs-example");

Source<Pair<EventData, Checkpointable>, Consumer.Control> source =
  Consumer.pairSource(
    consumerSettings,
    eventProcessorBuilder,
    checkpointSettings,
    checkpointStore);

Pair<Consumer.Control, CompletionStage<Done>> controlAndStreamCompletion =
  source.via(businessLogicFlow)
    .toMat(Checkpointer.sink(checkpointSettings), Keep.both())
    .run(actorSystem);

// The stream will consume until there is a failure or the business logic completes the stream.
// To stop it cleanly:
controlAndStreamCompletion.first()
  .drainAndShutdown(
    controlAndStreamCompletion.second(),
    actorSystem.dispatcher());

Checkpointing

The checkpointing flows and sinks checkpoint event positions to the CheckpointStore used by the consumer stage which received the event. Each of the stages consumes a CheckpointableCheckpointable describing the event position.

Any implementation of CheckpointStore may be used: this connector does not depend on any specific implementation. Microsoft provides two implementations:

Construction and configuration of the CheckpointStore implementation is done programmatically according to the documentation of the particular implementation.

These factory methods are part of the CheckpointerConsumer API.

Factory method Emits Notes
batchFlow Checkpointable The emitted Checkpointable contains the event position actually checkpointed. This is suitable for logging or otherwise tracking checkpoint progress.
flow Done
flowWithCheckpointableContext Done Checkpointable is emitted as context.
sink Not Applicable Materialized future completes when stream completes (all pending checkpoints have succeeded or failed).
sinkWithCheckpointableContext Not Applicable Materialized future completes when stream completes (all pending checkpoints have succeeded or failed).

All the flows and sinks support batching according to the passed settings and will fail if a checkpoint attempt fails.

Checkpointing settings

The behavior of the checkpointing flows and sinks may be customized via CheckpointSettingsCheckpointSettings, which may be obtained from config or programmatically built.

The defaults used when constructing from config are

reference.conf (HOCON)
sourcealpakka.azure.eventhubs.checkpoint {
  # If a checkpoint attempt takes longer than this, consider it failed
  timeout = 30s

  # In a stream, accumulate up to this many checkpoints before checkpointing
  # the latest.  Setting to 1 will checkpoint every checkpointable in the stream.
  # Increasing will generally improve stream throughput, at the cost of
  # reprocessing more events after partition rebalances or consumer restarts
  max-batch = 100

  # In a stream, if this interval passes since the last checkpoint attempt and there have
  # been received checkpoints, checkpoint even if max-batch checkpoints haven't
  # yet been accumulated
  max-interval = 1s

  # Affects how the batching logic deals with multiple partitions, which may in
  # some cases improve throughput (e.g. with a checkpoint store implementation
  # which has different latencies for different partitions)
  partitions = 1

  # Limit how many checkpoint attempts are in-flight.  If multiple checkpoints
  # are in-flight for the same partition in the same consumer group to the same
  # checkpoint store, deduplication will occur, favoring the latest checkpoint
  # in the partition.  If `partitions` is greater than one, this is per-partition.
  max-inflight = 1
}

These defaults are compatible with “at-least-once” processing of events: each event will likely only be processed once, but in failure cases, the events may be processed multiple times.

Error handling and “at-least-once” vs. “at-most-once”

Consumer errors

Errors from the underlying EventProcessorClient will be forwarded to the consumer sources and fail the connected streams.

Producer errors

Produces are retried by both the underlying client and this connector. Exceeding the provisioned throughput limits for a given hub will not (by itself) result in a failure. In the event that the retries are exhausted, the producer stage will fail the stream.

Restarting the stream with a backoff

Akka Streams provides graph stages which will gracefully restart on failure, with a configurable backoff. This can be taken advantage of to restart a failing stream its associated consumer.

Scala
sourceimport akka.stream.RestartSettings
import java.util.concurrent.atomic.AtomicReference

// bring your own implementation of buildCheckpointStore
val checkpointStore = buildCheckpointStore(checkpointConfig)

val consumerSettings = ConsumerSettings(actorSystem).withConsumerGroup("eventhubs-example")

// For storing the control from the most recent materialization
val control = new AtomicReference[Option[Consumer.Control]](None)

val restartSettings = RestartSettings(minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2)
val restartingSource =
  RestartSource.onFailuresWithBackoff(restartSettings) { () =>
    Consumer
      .source(
        consumerSettings,
        // Be sure to get a new processor client builder every restart
        ClientFromConfig.processorClientBuilder(consumerConfig),
        checkpointSettings,
        checkpointStore) { (eventData, checkpointable) =>
        DomainType.fromEventHubs(eventData, checkpointable)
      }
      .mapMaterializedValue { c =>
        // Store the control from this start
        control.set(Some(c))
      }
      .via(businessLogicFlow)
  }

val streamCompletion = restartingSource.runWith(Checkpointer.sink(checkpointSettings))

// To drain and shutdown the stream
control.get().foreach(_.drainAndShutdown(streamCompletion))
Java
source// bring your own implementation of buildCheckpointStore
CheckpointStore checkpointStore = buildCheckpointStore(checkpointConfig);

ConsumerSettings consumerSettings = ConsumerSettings.create(actorSystem).withConsumerGroup("eventhubs-example");

// For storing the control from the most recent materialization
AtomicReference<Optional<Consumer.Control>> wrappedControl =
  new AtomicReference<Optional<Consumer.Control>>(Optional.empty());

RestartSettings restartSettings =
  RestartSettings.create(
    Duration.of(3, SECONDS),    // minBackoff
    Duration.of(30, SECONDS),   // maxBackoff
    0.2);                       // randomFactor

Source<Checkpointable, NotUsed> restartingSource =
  RestartSource.onFailuresWithBackoff(
    restartSettings,
    () ->
      Consumer.source(
          consumerSettings,
          // Be sure to get a new client builder every restart
          ClientFromConfig.processorClientBuilder(consumerConfig),
          checkpointSettings,
          checkpointStore,
          DomainType::fromEventHubs)
        .mapMaterializedValue(control -> {
          // Store the control from this start
          wrappedControl.set(Optional.of(control));
          return NotUsed.notUsed();
        })
        .via(businessLogicFlow)
  );

CompletionStage<Done> streamCompletion =
  restartingSource.runWith(Checkpointer.sink(checkpointSettings), actorSystem);

// To drain and shutdown the stream
wrappedControl.get().ifPresent(control ->
  control.drainAndShutdown(
    streamCompletion,
    actorSystem.dispatcher()));

If using a RestartSource, note that failures in the sink will not trigger the source’s restart logic. If using a Checkpointer sink, this implies that failures to checkpoint will not cause stream restarts, while Checkpointer flows (if within the restart source) will trigger stream restarts on checkpoint failures. Conversely, if the failure to checkpoint results from the unavailability or failure of the checkpoint store, it may be unlikely (relative to, e.g., failures in the “business logic” of the stream) that consumption will succeed on restart; this may argue for a different approach to checkpoint failures (e.g. longer backoff or escalating the failure by terminating the application so that some external coordination starts consuming elsewhere).

Processing guarantees

In many applications for which Event Hubs is well suited, it is a requirement that every event in a hub being consumed gets processed at least once. The default settings for consumption and checkpointing work well with this requirement (assuming no external manipulation of the checkpoint store: if that happens, “all bets are off”). However, care is needed to ensure that the stream logic does not inadvertently weaken these semantics: surprising and difficult-to-resolve bugs can ensue from having weaker semantics than expected.

One pattern which often weakens at-least-once semantics is multiple side effects per checkpointable. If multiple side effects are required, it is critical to delay the checkpoint until all side effects have succeeded. For example, if the side effects involve producing multiple events to an Event Hubs hub, using ProducerMessage.multi or ProducerMessage.batch to produce events is called for. If consumed events are batched for more efficient processing, it is imperative that the checkpointable of the last event in the batch be the “surviving” checkpointable and that it not be emitted until every event in the batch is deemed processed. In other situations, it is often reasonable to perform the side effects serially.

Processing events out of order often weakens at-least-once semantics, as reordering can result in a later event position being checkpointed before an earlier position (this situation may also weaken at-most-once semantics!). To avoid processing events out of order, use only stages which preserve ordering. Most Akka Streams built-in operators preserve ordering: a notable exception is mapAsyncUnordered, which should never be used if at-least-once processing is desired. Another general exception is the pattern of groupBy followed by mergeSubstreams: use mapAsyncPartitioned (or perhaps techniques outside of streams, such as an ask to an actor) instead.

If at-most-once processing semantics are desired, it is possible to obtain these semantics with this connector. “Hard” at-most-once requires checkpointing before processing each event. This precludes batching of checkpoints: while at-most-once messaging in Akka generally results in greater throughput than at-least-once messaging, the inability to batch checkpoints likely makes at-most-once processing of events from Event Hubs exhibit reduced throughput compared to at-least-once processing. Hard at-most-once should thus only be used in situations where there is a correctness requirement that no event ever be processed multiple times.

Scala
source// bring your own implementation of buildCheckpointStore
val checkpointStore = buildCheckpointStore(checkpointConfig)

val eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig)
val consumerSettings = ConsumerSettings(actorSystem).withConsumerGroup("eventhubs-example")

// Batching checkpoints is incompatible with hard at-most-once processing
val checkpointSettings =
  CheckpointSettings(
    checkpointTimeout = 30.seconds,
    maxBatch = 1,
    maxInterval = 1.second,
    partitions = 1,
    maxInflight = 1)

val (control, streamDone) =
  Consumer
    .pairSource(consumerSettings, eventProcessorBuilder, checkpointSettings, checkpointStore)
    .via(
      // Must checkpoint (with no batching!) before processing
      Flow
        .fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
          import akka.stream.FlowShape
          import GraphDSL.Implicits._

          val breaker = builder.add(Unzip[EventData, Checkpointable]())
          val maker = builder.add(Zip[EventData, Done]())

          breaker.out0 ~> maker.in0
          breaker.out1 ~> Checkpointer.flow(checkpointSettings) ~> maker.in1

          FlowShape(breaker.in, maker.out)
        })
        .map(_._1))
    .via(businessLogicFlow)
    .toMat(Sink.ignore)(Keep.both)
    .run()

// The stream will consume until there is a failure or the business logic completes the stream.
// To stop it cleanly:
control.drainAndShutdown(streamDone)
Java
source// bring your own implementation of buildCheckpointStore
CheckpointStore checkpointStore = buildCheckpointStore(checkpointConfig);

EventProcessorClientBuilder eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig);
ConsumerSettings consumerSettings = ConsumerSettings.create(actorSystem).withConsumerGroup("eventhubs-example");

// Batching checkpoints is incompatible with hard at-most-once processing
CheckpointSettings checkpointSettings =
  CheckpointSettings.create(
    Duration.of(30, SECONDS), // checkpointTimeout
    1,                        // maxBatch
    Duration.of(1, SECONDS),  // maxInterval
    1,                        // partitions
    1);                       // maxInflight

Pair<Consumer.Control, CompletionStage<Done>> controlWithCompletion =
  Consumer.pairSource(
      consumerSettings,
      eventProcessorBuilder,
      checkpointSettings,
      checkpointStore)
    .via(
      // Must checkpoint (with no batching!) before processing
      Flow.<Pair<EventData, Checkpointable>, Pair<EventData, Done>, NotUsed>fromGraph(
        GraphDSL.create(
          builder -> {
            FanOutShape2<Pair<EventData, Checkpointable>, EventData, Checkpointable> breaker =
              builder.add(Unzip.create(EventData.class, Checkpointable.class));

            FanInShape2<EventData, Done, Pair<EventData, Done>> maker =
              builder.add(Zip.<EventData, Done>create());

            builder.from(breaker.out0()).toInlet(maker.in0());
            builder.from(breaker.out1())
              .<Done>via(Checkpointer.flow(checkpointSettings).shape())
              .toInlet(maker.in1());

            return FlowShape.of(
                breaker.in(), maker.out());
          })
      )
      .map(Pair::first)
    )
    .via(businessLogicFlow)
    .toMat(Sink.ignore(), Keep.both())
    .run(actorSystem);

// The stream will consume until there is a failure or the business logic completes the stream
// To stop it cleanly:
controlWithCompletion.first()
  .drainAndShutdown(
    controlWithCompletion.second(),
    actorSystem.dispatcher());

Consuming, producing, and checkpointing in one stream

The general approach for consuming, producing, and checkpointing in one stream is to:

  • have a Consumer.sourceWithCheckpointableContext
  • transform the EventData into envelopes (using ProducerMessage.empty to filter out events while propagating the Checkpointable)
  • use a Producer.flowWithContext to produce events to the target hub
  • checkpoint the events with Checkpointer.sinkWithCheckpointableContext
Scala
sourceval citiesToRegions = Map(
  "Stockholm" -> "Nordic",
  "Lausanne" -> "Central",
  "Nice" -> "Mediterranean",
  "Amsterdam" -> "Northwest",
  "Dublin" -> "Northwest",
  "Helsinki" -> "Nordic",
  "Prague" -> "Central",
  "Barcelona" -> "Mediterranean")

val producerClient = ClientFromConfig.producer(producerConfig)
val producerSettings = ProducerSettings(actorSystem)

val checkpointStore = buildCheckpointStore(checkpointConfig)
val eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig)
val consumerSettings = ConsumerSettings(actorSystem).withConsumerGroup("eventhubs-example")

val endToEndStream =
  Consumer
    .sourceWithCheckpointableContext(
      consumerSettings,
      eventProcessorBuilder,
      checkpointSettings,
      checkpointStore)
    .map { eventData =>
      try {
        val count = eventData.getBodyAsString.toInt
        val region = citiesToRegions.getOrElse(eventData.getPartitionKey, "Unknown")
        ProducerMessage.singleWithPartitioning(
          new EventData(count.toString),
          ProducerMessage.partitionByKey(region))
      } catch {
        case _: NumberFormatException => ProducerMessage.empty()
      }
    }
    .via(Producer.flowWithContext(producerSettings, producerClient))
    .via(Checkpointer.flowWithCheckpointableContext(checkpointSettings))
    .toMat(Sink.ignore)(Consumer.DrainingControl.apply)

val control = endToEndStream.run()
Java
sourceConcurrentHashMap<String, String> citiesToRegions = new ConcurrentHashMap<String, String>();
citiesToRegions.put("Stockholm", "Nordic");
citiesToRegions.put("Lausanne", "Central");
citiesToRegions.put("Nice", "Mediterranean");
citiesToRegions.put("Amsterdam", "Northwest");
citiesToRegions.put("Dublin", "Northwest");
citiesToRegions.put("Helsinki", "Nordic");
citiesToRegions.put("Prague", "Central");
citiesToRegions.put("Barcelona", "Mediterranean");

EventHubProducerAsyncClient producerClient = ClientFromConfig.producer(producerConfig);
ProducerSettings producerSettings = ProducerSettings.create(actorSystem);

CheckpointStore checkpointStore = buildCheckpointStore(checkpointConfig);
EventProcessorClientBuilder eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig);
ConsumerSettings consumerSettings = ConsumerSettings.create(actorSystem).withConsumerGroup("eventhubs-example");

RunnableGraph<Consumer.DrainingControl<Done>> endToEndStream =
  Consumer.sourceWithCheckpointableContext(
      consumerSettings,
      eventProcessorBuilder,
      checkpointSettings,
      checkpointStore)
    .map(eventData -> {
      try {
        // Parsing to validate
        int count = Integer.parseInt(eventData.getBodyAsString());
        String region = citiesToRegions.get(eventData.getPartitionKey());
        region = (region == null) ? "Unknown" : region;

        return ProducerMessage.singleWithPartitioning(
          new EventData(eventData.getBody()),
          ProducerMessage.partitionByKey(region));
      } catch (NumberFormatException nfe) {
        return ProducerMessage.empty();
      }
    })
    .via(Producer.flowWithContext(producerSettings, producerClient))
    .via(Checkpointer.flowWithCheckpointableContext(checkpointSettings))
    .toMat(Sink.ignore(), Consumer::createDrainingControl);

Consumer.DrainingControl<Done> control = endToEndStream.run(actorSystem);

Serialization

As with Alpakka Kafka, our firm recommendation is that serialization to and deserialization from byte arrays be performed in the stream (e.g. as map stages), as this eases implementation of desired error handling strategies.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.