Azure Event Hubs
Azure Event Hubs is a real-time data streaming platform which is proprietary to the Microsoft Azure cloud.
While Event Hubs supports some Apache Kafka APIs, we do not recommend using Alpakka Kafka. Our experience has been that it is easy to find oneself using Kafka functionality with Alpakka Kafka which is not fully supported by Event Hubs.
For information about setting up and managing Event Hubs infrastructure, we refer you to the official documentation.
This connector uses the native/AMQP protocol to interact with Event Hubs.
A note about the nomenclature: “Event Hubs” (plural) is the name of the product. The analogue to a Kafka topic is the singular hub. This documentation uses “Event Hubs hub” to refer to a singular hub and “Event Hubs topics” to refer to the plural (in the interest of not having “Event Hubs hubs”).
The Azure Event Hubs connector provides and Akka Stream source to consume from and flows/sinks to produce and checkpoint to Azure Event Hubs topics. It is based on the official Microsoft Azure Java SDK.
Project Info: Azure Event Hubs | |
---|---|
Artifact | com.lightbend.akka
akka-stream-azure-eventhubs
2.0.0 |
JDK versions | Adopt OpenJDK 11 |
Scala versions | 2.13.11 |
JPMS module name | akka.stream.alpakka.azure.eventhubs |
License | |
Readiness level |
Since 2.0, 2023-09-04
|
Home page | https://doc.akka.io/libraries/akka-enhancements/current/index.html |
API documentation | |
Forums | |
Issues | Issue tracker |
Sources | https://github.com/lightbend/akka-enhancements |
Artifacts
These artifacts are available on request from Lightbend, please contact your sales representative.
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.8.4" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-azure-eventhubs" % "2.0.0", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
<properties> <akka.version>2.8.4</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-azure-eventhubs_${scala.binary.version}</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ AkkaVersion: "2.8.4", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-azure-eventhubs_${versions.ScalaBinary}:2.0.0" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" }
The table below shows the direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.azure azure-messaging-eventhubs 5.16.1 com.typesafe.akka akka-actor-typed_2.13 2.8.4 com.typesafe.akka akka-stream_2.13 2.8.4 org.scala-lang scala-library 2.13.11 - Dependency tree
com.azure azure-messaging-eventhubs 5.16.1 com.azure azure-core-amqp 2.8.11 The MIT License (MIT) com.azure azure-core 1.44.1 The MIT License (MIT) com.azure azure-json 1.1.0 The MIT License (MIT) com.fasterxml.jackson.core jackson-annotations 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-databind 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.datatype jackson-datatype-jsr310 2.13.5 com.fasterxml.jackson.core jackson-annotations 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-databind 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.13.5 The Apache Software License, Version 2.0 io.projectreactor reactor-core 3.4.33 Apache License, Version 2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.slf4j slf4j-api 1.7.36 com.microsoft.azure qpid-proton-j-extensions 1.2.4 The MIT License (MIT) org.apache.qpid proton-j 0.33.8 org.slf4j slf4j-api 1.7.36 org.apache.qpid proton-j 0.33.8 com.azure azure-core 1.44.1 The MIT License (MIT) com.azure azure-json 1.1.0 The MIT License (MIT) com.fasterxml.jackson.core jackson-annotations 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-databind 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.datatype jackson-datatype-jsr310 2.13.5 com.fasterxml.jackson.core jackson-annotations 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-databind 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.13.5 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.13.5 The Apache Software License, Version 2.0 io.projectreactor reactor-core 3.4.33 Apache License, Version 2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-actor-typed_2.13 2.8.4 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.4 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.11 Apache-2.0 org.scala-lang scala-library 2.13.11 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.8.4 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.4 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.11 Apache-2.0 org.scala-lang scala-library 2.13.11 Apache-2.0 org.scala-lang scala-library 2.13.11 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.11 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-stream_2.13 2.8.4 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.4 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.11 Apache-2.0 org.scala-lang scala-library 2.13.11 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.4 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.11 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.11 Apache-2.0 org.scala-lang scala-library 2.13.11 Apache-2.0
The code snippets in this document assume the following imports:
- Scala
-
source
import akka.stream.alpakka.azure.eventhubs._ import akka.stream.alpakka.azure.eventhubs.scaladsl._ import akka.stream.scaladsl._ import com.azure.messaging.eventhubs._ import scala.concurrent.duration._
- Java
-
source
import akka.actor.ActorSystem; import akka.japi.Pair; import akka.stream.*; import akka.stream.alpakka.azure.eventhubs.*; import akka.stream.alpakka.azure.eventhubs.javadsl.*; import akka.stream.javadsl.*; import com.azure.messaging.eventhubs.*;
Connecting to Event Hubs
The stages in this connector defer connecting to and authentication/authorization for Event Hubs to the underlying client’s builders (e.g. EventHubClientBuilder
and EventProcessorClientBuilder
). As a developer convenience, if connection strings (embedding the appropriate auth tokens) obtained from the Azure Event Hubs UI are being used, the ClientFromConfig
ClientFromConfig
can construct client instances from a Config object formatted as:
- reference.conf (HOCON)
-
source
alpakka.azure.eventhubs.eventhub { # The connection string for the event hub # (see https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send?tabs=connection-string%2Croles-azure-portal#get-the-connection-string) connection-string = "" # The name of the particular event hub hub-name = "" }
If not using connection strings, programmatic construction must be used.
Producing to Event Hubs
The producer flows produce to exactly one hub, which is the one to which the provided EventHubProducerAsyncClient
has been bound. An EventHubProducerAsyncClient
may be constructed programmatically (using an EventHubClientBuilder
) or from Config (see “Connecting to Event Hubs”).
The flows consume events wrapped in envelopes constructed using the methods in ProducerMessage
ProducerMessage
. The envelopes may contain a pass-through value which the producer flow will emit downstream after successfully publishing the event to the hub. This can be used to ensure at-least-once publishing to Event Hubs (e.g. by passing through a Promise
CompletableFuture
to be completed, an offset/sequence number to be committed, or an ActorRef
to send an acknowledgement to).
Partitioning
Event Hubs topics have 1 or more partitions. The envelopes describe how the event(s) within the envelope should be routed to partitions. The choice of partitioning has implications for ordering/consistency and availability.
-
ProducerMessage.roundRobinPartitioning
(default): randomly choose a partition to which the underlying client is able to connect. Use of this partitioning will not guarantee the ordering of produced messages (it will be up to consumers who care about ordering to reconstruct order from the published events). This partitioning is the default in order to match the defaults of the underlying client, but one of the other partitionings may be easier to reason about in many cases. -
ProducerMessage.explicitPartitioning
: explicitly routes events to a specific partition. It is up to the application code to interrogate Event Hubs to obtain valid partition IDs. The advantage of this partitioning overpartitionByKey
is that it allows the partition used to be stable as partitions are added to the hub, at the cost of making determining which partition to use an application concern. -
ProducerMessage.partitionByKey
: the underlying client will hash the provided string and select a partition based on that hash. As long as partitions are not added, the selected partition for a given key will be stable, however if partitions are added, the selected partition will likely change.
Producing multiple events
Zero or more events may be placed into an envelope.
-
ProducerMessage.empty
: An envelope with no events. This is perhaps most useful for carrying along the pass-through; it may also be useful as a starting point to progressively fill an envelope through multiple processing stages. -
ProducerMessage.single
: An envelope with a single event. -
ProducerMessage.multi
: An envelope with multiple events (in order). The producer flow may perform multiple produce operations to produce the events and these operations may be retried independently. If an event A is successfully produced, all events preceding A were successfully produced at least once, but this does not preclude such a preceding event from also being produced again (due to retry). -
ProducerMessage.batch
: An envelope with events which will be produced atomically, in one produce operation (either all events were produced or none were). The total size of the events plus Event Hubs metadata must not exceed the maximum size allowed by the topic. Note that the stage does not perform any deduplication: in the presence of retries and restarts, a batch could be produced multiple times.
It is possible to add events to an existing envelope, though care must be taken to not exceed batch-size limits for the Event Hubs topic being produced to. For batch
envelopes, the batch-size limit applies to the total size of the events, plus Event Hubs metadata while for multi
(and single
) envelopes, the limit applies to individual events (plus the metadata associated with producing a single event). The batch-size limit is obtained by the underlying client based on the provisioned tier for the hub. It is generally advised in event-driven systems to keep one’s events “fairly small”: if events are no larger than 1 KiB, batches of hundreds of events are allowed.
All events in a multi
envelope will share the partitioning. If the partitioning is roundRobinPartitioning
, each individual produce operation will be independently routed (that is to say, the combination of multi
and roundRobinPartitioning
will not guarantee that all events in the envelope go to the same partition; for envelopes with many events, this does mean that the success of the envelope will not depend on the availability of any particular partition).
As events in a batch
envelope are produced in a single operation, they will be routed to the same partition, though in a situation where an attempted operation is incorrectly reported by the underlying client to have failed (e.g. a network partition resulted in the acknowledgement from the target partition not being received) and the roundRobinPartitioning
is being used, the retry may successfully produce to a different partition.
If producing multiple events as a semantic unit (e.g. if projecting events from Akka Persistence where such events have a correlation ID to the command which gave rise to them) and one is certain that the events will fit inside a batch, it is likely best to produce them via a batch
envelope to preserve the semantic meaning. Conversely, if batching is employed purely as a performance optimization, multi
should be considered.
The producer flows do not automatically coalesce envelopes into batches. Such functionality should be implemented by the application.
Producer settings
The behavior of a producer flow may be customized via ProducerSettings
ProducerSettings
, which may be obtained from config or programmatically built.
The defaults used when constructing from config are
- reference.conf (HOCON)
-
source
alpakka.azure.eventhubs.producer { # Maximum number of produce attempts in-flight at any time, without regard to # partitioning max-parallelism = 100 # Maximum number of produce attempts in-flight for a given (potentially logical) partition. # # Setting to greater than 1 may result in produced events being out-of-order. # # Event Hubs supports three partitioning strategies, which in this connector may # be set per-event or per-batch, though it is probably a good idea to pick one # strategy and stick to it for a given producer stage # # * round-robin: events are sent to any available partition. Since this # strategy intrinsically does not guarantee ordering of events, it is # recommended to set produces-per-partition to the same value as max-parallelism if # no other partitioning strategy is being used with this producer # # * by key: the partitioning key is hashed and assigned to a consistent # physical partition (assuming no physical partitions are added or removed). # If produces-per-partition is 1, then ordering of events for a given key is # guaranteed in the absence of a change in the number of physical partitions. # Events with different partitioning keys observe no ordering guarantee, # even if the respective partitioning keys end up assigned to the same # physical partition. It is highly recommended if using this strategy # to leave produces-per-partition at the default 1. # # * explicit: the event is assigned to a specific physical partition. Increasing # this setting from the default 1 may increase throughput but may reorder # events. produces-per-partition = 1 # Whether a producer stage should close the underlying client when stopping. # If sharing the underlying client, this should be set to false close-producer-on-stop = true # Produce attempts may be automatically retried with an exponential backoff retry { max-attempts = 1 min-backoff = 100ms max-backoff = 10s } }
When constructing ProducerSettings
from an ActorSystem
, the config section alpakka.azure.eventhubs.producer
is used. ProducerSettings
may also be constructed from another config object with the same layout as above.
Producer.flow
In this example, we convert an application type into a ProducerMessage
. Since it is desirable to preserve ordering per-subject
, we partition based on the subject of the events and ensure that produces-per-partition
is 1.
- Scala
-
source
// A minimal class demonstrating an "application level" view of an event case class Event(subject: String, payload: Array[Byte]) val producerClient: EventHubProducerAsyncClient = ClientFromConfig.producer(producerConfig) val producerSettings = ProducerSettings(actorSystem).withProducesPerPartition(1) val flow: Flow[Event, String, NotUsed] = Flow[Event] .map { event => ProducerMessage.single( new EventData(event.payload), event.subject, ProducerMessage.partitionByKey(event.subject)) } .via(Producer.flow(producerSettings, producerClient)) .wireTap(subject => actorSystem.log.debug("Published event for subject [{}]", subject))
- Java
-
source
public class Event { public final String subject; public final byte[] payload; public Event(String subject, byte[] payload) { this.subject = subject; this.payload = payload; } } EventHubProducerAsyncClient producerClient = ClientFromConfig.producer(producerConfig); ProducerSettings producerSettings = ProducerSettings.create(actorSystem).withProducesPerPartition(1); Flow<Event, String, NotUsed> flow = Flow.of(Event.class) .map(event -> ProducerMessage.single( new EventData(event.payload), event.subject, ProducerMessage.partitionByKey(event.subject)) ) .via(Producer.flow(producerSettings, producerClient)) .wireTap(subject -> actorSystem.log().debug("Published event for subject [{}]", subject));
Producer.flowWithContext
This example is similar, but instead of a pass-through, an Offset
(e.g. using Akka Projection to obtain events from event-sourced actors) is passed as context so that our progress can be committed (thus ensuring at-least-once production).
- Scala
-
source
// see Producer.flow documentation for Event class val producerClient: EventHubProducerAsyncClient = ClientFromConfig.producer(producerConfig) val producerSettings = ProducerSettings(actorSystem).withProducesPerPartition(1) val flow: FlowWithContext[Event, Offset, NotUsed, Offset, NotUsed] = FlowWithContext[Event, Offset] .map { event => ProducerMessage.singleWithPartitioning( new EventData(event.payload), ProducerMessage.partitionByKey(event.subject)) } .via(Producer.flowWithContext(producerSettings, producerClient))
- Java
-
source
// see Producer.flow documentation for Event class EventHubProducerAsyncClient producerClient = ClientFromConfig.producer(producerConfig); ProducerSettings producerSettings = ProducerSettings.create(actorSystem).withProducesPerPartition(1); FlowWithContext<Event, Offset, NotUsed, Offset, NotUsed> flow = FlowWithContext.<Event, Offset>create() .map(event -> ProducerMessage.singleWithPartitioning( new EventData(event.payload), ProducerMessage.partitionByKey(event.subject)) ) .via(Producer.flowWithContext(producerSettings, producerClient));
Note that it is possible to carry a per-event pass-through in addition to propagating context.
Consuming from Event Hubs
The consumer sources subscribe to Event Hubs topics and feed events to an Akka Stream.
Each of the sources, when materialized, uses the provided EventProcessorClientBuilder
(see above) to build a client to consume from a hub; the underlying client responds to partition rebalances and this client manages checkpointing with the provided CheckpointSettings
CheckpointSettings
via the provided CheckpointStore
.
Choosing a source
These factory methods are part of the Consumer
Consumer
API.
Factory method | Element type | Notes |
---|---|---|
sourceWithCheckpointableContext |
EventData |
Checkpointable is passed as context |
source |
user-specified | Uses a provided function to wrap EventData and Checkpointable into an application-specific type |
pairSource |
(EventData, Checkpointable) Pair<EventData, Checkpointable> |
The sources materialize to a Consumer.Control
Consumer.Control
instance. The pattern for a “controlled shutdown” of a consumer is:
- Call
stop()
on the control instance. This will cause the source to stop emitting events downstream and the source will complete. The consumption infrastructure will remain available in order to handle future checkpoint attempts. - Wait for the entire stream to complete processing, to ensure that a checkpoint attempt is made for all processed events.
- Call
shutdown()
on the control instance. This signals the consumption infrastructure that no more checkpoint attempts will be made. The infrastructure will shutdown when those attempts have completed.
Typically, the stream completion will be signaled via a Future[Done]
CompletionStage<Done>
: the checkpointing sinks and Sink.ignore
are examples of stages which materialize as such a future. If stream completion is signaled by such a future, the drainAndShutdown
method on the control encapsulates this pattern.
Consumer settings
The behavior of the consumer sources may be customized via ConsumerSettings
ConsumerSettings
, which may be obtained from config or programmatically built.
The defaults used when constructing from config are
- reference.conf (HOCON)
-
source
alpakka.azure.eventhubs.consumer { # The consumer group to use. Note that Event Hubs requires consumer groups to # be pre-allocated on the hub. This must be set. consumer-group = "" # When consuming events, the client will try to accumulate a batch of at most this # size. Batching is per-partition. batch-size = 1000 # A batch from a partition will become available after this time period, whether # or not it has 'batch-size' events. A batch becomes available before this interval # has passed (and when not backpressured) a new interval will start: accordingly # if partitions have a sufficient backlog of events, consumption rates per-partition may # substantially exceed 'batch-size' times 'batch-interval' batch-interval = 1s # Advisory capacity for buffers in the stage. When there are more than this many # events in the main buffer, all partitions consumed from will backpressure. Must be # at least 'batch-size'. The less this is relative to 'batch-size' times the number of # partitions assigned at a given time to a consumer, the more time partitions will spend # backpressured. buffer-size = 1000 # If non-empty, this identifier is used to identify the consumer stage in logs; it # is useful if starting multiple consumers in the same process. consumer-id = "" # If no checkpoint is found and no initial position was programmatically specified, # whether to consume from the 'earliest' position, 'latest' position, or the underlying # consumer's 'default' (which as of Azure SDK 5.15.6 is latest). # # Valid values: 'earliest', 'latest', 'default', or empty (equivalent to default) fallback-position = default # Which strategy to use for load-balancing partitions between consumers of the same # Event Hub in the same group using the same checkpoint store. # # Valid values: # * 'balanced': each consumer acquires one partition at a time until all partitions # are acquired and each consumer owns an approximately equal number of partitions. # This reduces rebalances in response to membership changes, but at startup will # delay the point where all partitions are being consumed. # * 'greedy': each consumer attempts to acquire any unclaimed partitions. Compared # to the 'balanced' strategy, this will have a faster time to all partitions # being consumed and a faster response to membership changes, but it will result # in more rebalances. # * 'default' or empty: Use the underlying consumer's default strategy (which as of # Azure SDK 5.15.6 is the balanced strategy) load-balancing-strategy = default # How frequently this consumer will check that partition ownership is balanced and apply the # load balancing strategy (see 'load-balancing-strategy'). If empty, use the # underlying consumer's default load-balancing-interval = "" # This consumer will consider partition claims which have not been renewed within this interval # to have expired and the partition to be claimable. It will take at least this amount of # time for any partitions owned by a failed or stopped consumer to be reclaimed. If empty, # the underlying client's default applies. partition-ownership-interval = "" }
The sources apply these settings to the EventProcessorClientBuilder
. The builder is (as of this writing) mutable and each setting is write-once. Accordingly, the following settings should not be set on the EventProcessorClientBuilder
provided to a consumer source:
EventProcessorClientBuilder setting |
Alternative to setting on the EventProcessorClientBuilder |
---|---|
consumerGroup |
Set via ConsumerSettings |
checkpointStore |
Provide separately to source |
initialPartitionEventPosition |
Set via ConsumerSettings |
loadBalancingStrategy |
Set via ConsumerSettings |
loadBalancingUpdateInterval |
Set via ConsumerSettings |
partitionOwnershipExpirationInterval |
Set via ConsumerSettings |
processError |
Must not be set |
processEvent |
Must not be set |
processEventBatch |
Must not be set |
processPartitionClose |
Must not be set |
processPartitionInitialization |
Must not be set |
The EventProcessorClientBuilder
should not be reused: each source should be provided a new instance for every run
or restart.
Consumer.sourceWithCheckpointableContext
In this example, we consume from our hub and transform the events into an application representation (following the same convention of using the subject of the event as the partitioning key as in the producer examples above) before handing them to a businessLogicFlow
and checkpointing when the business logic is complete.
- Scala
-
source
// A minimal class demonstrating an "application level" view of an event case class Event(subject: String, payload: Array[Byte]) def parseEvent(bytes: Array[Byte]): Event = ??? val checkpointSettings = CheckpointSettings( checkpointTimeout = 30.seconds, maxBatch = 1000, maxInterval = 1.second, partitions = 1, maxInflight = 1) // bring your own implementation of buildCheckpointStore val checkpointStore = buildCheckpointStore(checkpointConfig) val eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig) val consumerSettings = ConsumerSettings(actorSystem).withConsumerGroup("eventhubs-example") val source: SourceWithContext[Event, Checkpointable, Consumer.Control] = Consumer .sourceWithCheckpointableContext( consumerSettings, eventProcessorBuilder, checkpointSettings, checkpointStore) .map { eventData => Event(eventData.getPartitionKey, eventData.getBody) } val (control, streamDone) = source .via(businessLogicFlow) // Important: businessLogicFlow has Checkpointable as output context .toMat(Checkpointer.sinkWithCheckpointableContext(checkpointSettings))(Keep.both) .run() // The stream will consume until there is a failure or the business logic completes the stream. // To stop it cleanly: control.drainAndShutdown(streamDone)
- Java
-
source
public class Event { public final String subject; public final byte[] payload; public Event(String subject, byte[] payload) { this.subject = subject; this.payload = payload; } } private static Event parseEvent(byte[] bytes) { throw new scala.NotImplementedError(); } // bring your own implementation of buildCheckpointStore CheckpointStore checkpointStore = buildCheckpointStore(checkpointConfig); EventProcessorClientBuilder eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig); ConsumerSettings consumerSettings = ConsumerSettings.create(actorSystem).withConsumerGroup("eventhubs-example"); SourceWithContext<Event, Checkpointable, Consumer.Control> source = Consumer.sourceWithCheckpointableContext( consumerSettings, eventProcessorBuilder, checkpointSettings, checkpointStore) .map(eventData -> new Event(eventData.getPartitionKey(), eventData.getBody())); Pair<Consumer.Control, CompletionStage<Done>> controlAndStreamCompletion = source.via(businessLogicFlow) .toMat(Checkpointer.sinkWithCheckpointableContext(checkpointSettings), Keep.both()) .run(actorSystem); // The stream will consume until there is a failure or the business logic completes the stream. // To stop it cleanly: controlAndStreamCompletion.first(). drainAndShutdown( controlAndStreamCompletion.second(), actorSystem.dispatcher());
Consumer.source
This example is essentially the same as above: the difference here is that the application representation includes the Checkpointable
rather than relying on the stream to pass it alongside as context. The business logic flow emits the Checkpointable
when done processing. It is advised to prefer sourceWithCheckpointableContext
, though this variant does not constrain as much as FlowWithContext
.
- Scala
-
source
// bring your own implementation of buildCheckpointStore val checkpointStore = buildCheckpointStore(checkpointConfig) val eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig) val consumerSettings = ConsumerSettings(actorSystem).withConsumerGroup("eventhubs-example") val source: Source[DomainType, Consumer.Control] = Consumer.source(consumerSettings, eventProcessorBuilder, checkpointSettings, checkpointStore) { (eventData, checkpointable) => DomainType.fromEventHubs(eventData, checkpointable) } val (control, streamDone) = source .via(businessLogicFlow) // Important: businessLogicFlow outputs Checkpointable .toMat(Checkpointer.sink(checkpointSettings))(Keep.both) .run() // The stream will consume until there is a failure or the business logic completes the stream. // To stop it cleanly: control.drainAndShutdown(streamDone)
- Java
-
source
static DomainType fromEventHubs(EventData eventData, Checkpointable checkpointable) { return new DomainType(eventData.getPartitionKey(), eventData.getBody(), checkpointable); } // bring your own implementation of buildCheckpointStore CheckpointStore checkpointStore = buildCheckpointStore(checkpointConfig); EventProcessorClientBuilder eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig); ConsumerSettings consumerSettings = ConsumerSettings.create(actorSystem).withConsumerGroup("eventhubs-example"); Source<DomainType, Consumer.Control> source = Consumer.source( consumerSettings, eventProcessorBuilder, checkpointSettings, checkpointStore, DomainType::fromEventHubs); Pair<Consumer.Control, CompletionStage<Done>> controlAndStreamCompletion = source.via(businessLogicFlow) .toMat(Checkpointer.sink(checkpointSettings), Keep.both()) .run(actorSystem); // The stream will consume until there is a failure or the business logic completes the stream. // To stop it cleanly: controlAndStreamCompletion.first() .drainAndShutdown( controlAndStreamCompletion.second(), actorSystem.dispatcher());
Consumer.pairSource
This example is essentially the same as above: the difference here is that there’s no application representation as we operate on pairs of EventData
and Checkpointable
.
- Scala
-
source
// bring your own implementation of buildCheckpointStore val checkpointStore = buildCheckpointStore(checkpointConfig) val eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig) val consumerSettings = ConsumerSettings(actorSystem).withConsumerGroup("eventhubs-example") val source: Source[(EventData, Checkpointable), Consumer.Control] = Consumer.pairSource(consumerSettings, eventProcessorBuilder, checkpointSettings, checkpointStore) val (control, streamDone) = source .via(businessLogicFlow) // Important: businessLogicFlow outputs Checkpointable .toMat(Checkpointer.sink(checkpointSettings))(Keep.both) .run() // The stream will consume until there is a failure or the business logic completes the stream. // To stop it cleanly: control.drainAndShutdown(streamDone)
- Java
-
source
// bring your own implementation of buildCheckpointStore CheckpointStore checkpointStore = buildCheckpointStore(checkpointConfig); EventProcessorClientBuilder eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig); ConsumerSettings consumerSettings = ConsumerSettings.create(actorSystem).withConsumerGroup("eventhubs-example"); Source<Pair<EventData, Checkpointable>, Consumer.Control> source = Consumer.pairSource( consumerSettings, eventProcessorBuilder, checkpointSettings, checkpointStore); Pair<Consumer.Control, CompletionStage<Done>> controlAndStreamCompletion = source.via(businessLogicFlow) .toMat(Checkpointer.sink(checkpointSettings), Keep.both()) .run(actorSystem); // The stream will consume until there is a failure or the business logic completes the stream. // To stop it cleanly: controlAndStreamCompletion.first() .drainAndShutdown( controlAndStreamCompletion.second(), actorSystem.dispatcher());
Checkpointing
The checkpointing flows and sinks checkpoint event positions to the CheckpointStore
used by the consumer stage which received the event. Each of the stages consumes a Checkpointable
Checkpointable
describing the event position.
Any implementation of CheckpointStore
may be used: this connector does not depend on any specific implementation. Microsoft provides two implementations:
- Azure Blob Storage Checkpoint Store uses Azure Storage Blobs as the backing storage
- Azure Redis Checkpoint Store (currently in preview) uses Redis as the backing storage
Construction and configuration of the CheckpointStore
implementation is done programmatically according to the documentation of the particular implementation.
These factory methods are part of the Checkpointer
Consumer
API.
Factory method | Emits | Notes |
---|---|---|
batchFlow |
Checkpointable |
The emitted Checkpointable contains the event position actually checkpointed. This is suitable for logging or otherwise tracking checkpoint progress. |
flow |
Done |
|
flowWithCheckpointableContext |
Done |
Checkpointable is emitted as context. |
sink |
Not Applicable | Materialized future completes when stream completes (all pending checkpoints have succeeded or failed). |
sinkWithCheckpointableContext |
Not Applicable | Materialized future completes when stream completes (all pending checkpoints have succeeded or failed). |
All the flows and sinks support batching according to the passed settings and will fail if a checkpoint attempt fails.
Checkpointing settings
The behavior of the checkpointing flows and sinks may be customized via CheckpointSettings
CheckpointSettings
, which may be obtained from config or programmatically built.
The defaults used when constructing from config are
- reference.conf (HOCON)
-
source
alpakka.azure.eventhubs.checkpoint { # If a checkpoint attempt takes longer than this, consider it failed timeout = 30s # In a stream, accumulate up to this many checkpoints before checkpointing # the latest. Setting to 1 will checkpoint every checkpointable in the stream. # Increasing will generally improve stream throughput, at the cost of # reprocessing more events after partition rebalances or consumer restarts max-batch = 100 # In a stream, if this interval passes since the last checkpoint attempt and there have # been received checkpoints, checkpoint even if max-batch checkpoints haven't # yet been accumulated max-interval = 1s # Affects how the batching logic deals with multiple partitions, which may in # some cases improve throughput (e.g. with a checkpoint store implementation # which has different latencies for different partitions) partitions = 1 # Limit how many checkpoint attempts are in-flight. If multiple checkpoints # are in-flight for the same partition in the same consumer group to the same # checkpoint store, deduplication will occur, favoring the latest checkpoint # in the partition. If `partitions` is greater than one, this is per-partition. max-inflight = 1 }
These defaults are compatible with “at-least-once” processing of events: each event will likely only be processed once, but in failure cases, the events may be processed multiple times.
Error handling and “at-least-once” vs. “at-most-once”
Consumer errors
Errors from the underlying EventProcessorClient
will be forwarded to the consumer sources and fail the connected streams.
Producer errors
Produces are retried by both the underlying client and this connector. Exceeding the provisioned throughput limits for a given hub will not (by itself) result in a failure. In the event that the retries are exhausted, the producer stage will fail the stream.
Restarting the stream with a backoff
Akka Streams provides graph stages which will gracefully restart on failure, with a configurable backoff. This can be taken advantage of to restart a failing stream its associated consumer.
- Scala
-
source
import akka.stream.RestartSettings import java.util.concurrent.atomic.AtomicReference // bring your own implementation of buildCheckpointStore val checkpointStore = buildCheckpointStore(checkpointConfig) val consumerSettings = ConsumerSettings(actorSystem).withConsumerGroup("eventhubs-example") // For storing the control from the most recent materialization val control = new AtomicReference[Option[Consumer.Control]](None) val restartSettings = RestartSettings(minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2) val restartingSource = RestartSource.onFailuresWithBackoff(restartSettings) { () => Consumer .source( consumerSettings, // Be sure to get a new processor client builder every restart ClientFromConfig.processorClientBuilder(consumerConfig), checkpointSettings, checkpointStore) { (eventData, checkpointable) => DomainType.fromEventHubs(eventData, checkpointable) } .mapMaterializedValue { c => // Store the control from this start control.set(Some(c)) } .via(businessLogicFlow) } val streamCompletion = restartingSource.runWith(Checkpointer.sink(checkpointSettings)) // To drain and shutdown the stream control.get().foreach(_.drainAndShutdown(streamCompletion))
- Java
-
source
// bring your own implementation of buildCheckpointStore CheckpointStore checkpointStore = buildCheckpointStore(checkpointConfig); ConsumerSettings consumerSettings = ConsumerSettings.create(actorSystem).withConsumerGroup("eventhubs-example"); // For storing the control from the most recent materialization AtomicReference<Optional<Consumer.Control>> wrappedControl = new AtomicReference<Optional<Consumer.Control>>(Optional.empty()); RestartSettings restartSettings = RestartSettings.create( Duration.of(3, SECONDS), // minBackoff Duration.of(30, SECONDS), // maxBackoff 0.2); // randomFactor Source<Checkpointable, NotUsed> restartingSource = RestartSource.onFailuresWithBackoff( restartSettings, () -> Consumer.source( consumerSettings, // Be sure to get a new client builder every restart ClientFromConfig.processorClientBuilder(consumerConfig), checkpointSettings, checkpointStore, DomainType::fromEventHubs) .mapMaterializedValue(control -> { // Store the control from this start wrappedControl.set(Optional.of(control)); return NotUsed.notUsed(); }) .via(businessLogicFlow) ); CompletionStage<Done> streamCompletion = restartingSource.runWith(Checkpointer.sink(checkpointSettings), actorSystem); // To drain and shutdown the stream wrappedControl.get().ifPresent(control -> control.drainAndShutdown( streamCompletion, actorSystem.dispatcher()));
If using a RestartSource
, note that failures in the sink will not trigger the source’s restart logic. If using a Checkpointer
sink, this implies that failures to checkpoint will not cause stream restarts, while Checkpointer
flows (if within the restart source) will trigger stream restarts on checkpoint failures. Conversely, if the failure to checkpoint results from the unavailability or failure of the checkpoint store, it may be unlikely (relative to, e.g., failures in the “business logic” of the stream) that consumption will succeed on restart; this may argue for a different approach to checkpoint failures (e.g. longer backoff or escalating the failure by terminating the application so that some external coordination starts consuming elsewhere).
Processing guarantees
In many applications for which Event Hubs is well suited, it is a requirement that every event in a hub being consumed gets processed at least once. The default settings for consumption and checkpointing work well with this requirement (assuming no external manipulation of the checkpoint store: if that happens, “all bets are off”). However, care is needed to ensure that the stream logic does not inadvertently weaken these semantics: surprising and difficult-to-resolve bugs can ensue from having weaker semantics than expected.
One pattern which often weakens at-least-once semantics is multiple side effects per checkpointable. If multiple side effects are required, it is critical to delay the checkpoint until all side effects have succeeded. For example, if the side effects involve producing multiple events to an Event Hubs hub, using ProducerMessage.multi
or ProducerMessage.batch
to produce events is called for. If consumed events are batched for more efficient processing, it is imperative that the checkpointable of the last event in the batch be the “surviving” checkpointable and that it not be emitted until every event in the batch is deemed processed. In other situations, it is often reasonable to perform the side effects serially.
Processing events out of order often weakens at-least-once semantics, as reordering can result in a later event position being checkpointed before an earlier position (this situation may also weaken at-most-once semantics!). To avoid processing events out of order, use only stages which preserve ordering. Most Akka Streams built-in operators preserve ordering: a notable exception is mapAsyncUnordered
, which should never be used if at-least-once processing is desired. Another general exception is the pattern of groupBy
followed by mergeSubstreams
: use mapAsyncPartitioned
(or perhaps techniques outside of streams, such as an ask to an actor) instead.
If at-most-once processing semantics are desired, it is possible to obtain these semantics with this connector. “Hard” at-most-once requires checkpointing before processing each event. This precludes batching of checkpoints: while at-most-once messaging in Akka generally results in greater throughput than at-least-once messaging, the inability to batch checkpoints likely makes at-most-once processing of events from Event Hubs exhibit reduced throughput compared to at-least-once processing. Hard at-most-once should thus only be used in situations where there is a correctness requirement that no event ever be processed multiple times.
- Scala
-
source
// bring your own implementation of buildCheckpointStore val checkpointStore = buildCheckpointStore(checkpointConfig) val eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig) val consumerSettings = ConsumerSettings(actorSystem).withConsumerGroup("eventhubs-example") // Batching checkpoints is incompatible with hard at-most-once processing val checkpointSettings = CheckpointSettings( checkpointTimeout = 30.seconds, maxBatch = 1, maxInterval = 1.second, partitions = 1, maxInflight = 1) val (control, streamDone) = Consumer .pairSource(consumerSettings, eventProcessorBuilder, checkpointSettings, checkpointStore) .via( // Must checkpoint (with no batching!) before processing Flow .fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => import akka.stream.FlowShape import GraphDSL.Implicits._ val breaker = builder.add(Unzip[EventData, Checkpointable]()) val maker = builder.add(Zip[EventData, Done]()) breaker.out0 ~> maker.in0 breaker.out1 ~> Checkpointer.flow(checkpointSettings) ~> maker.in1 FlowShape(breaker.in, maker.out) }) .map(_._1)) .via(businessLogicFlow) .toMat(Sink.ignore)(Keep.both) .run() // The stream will consume until there is a failure or the business logic completes the stream. // To stop it cleanly: control.drainAndShutdown(streamDone)
- Java
-
source
// bring your own implementation of buildCheckpointStore CheckpointStore checkpointStore = buildCheckpointStore(checkpointConfig); EventProcessorClientBuilder eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig); ConsumerSettings consumerSettings = ConsumerSettings.create(actorSystem).withConsumerGroup("eventhubs-example"); // Batching checkpoints is incompatible with hard at-most-once processing CheckpointSettings checkpointSettings = CheckpointSettings.create( Duration.of(30, SECONDS), // checkpointTimeout 1, // maxBatch Duration.of(1, SECONDS), // maxInterval 1, // partitions 1); // maxInflight Pair<Consumer.Control, CompletionStage<Done>> controlWithCompletion = Consumer.pairSource( consumerSettings, eventProcessorBuilder, checkpointSettings, checkpointStore) .via( // Must checkpoint (with no batching!) before processing Flow.<Pair<EventData, Checkpointable>, Pair<EventData, Done>, NotUsed>fromGraph( GraphDSL.create( builder -> { FanOutShape2<Pair<EventData, Checkpointable>, EventData, Checkpointable> breaker = builder.add(Unzip.create(EventData.class, Checkpointable.class)); FanInShape2<EventData, Done, Pair<EventData, Done>> maker = builder.add(Zip.<EventData, Done>create()); builder.from(breaker.out0()).toInlet(maker.in0()); builder.from(breaker.out1()) .<Done>via(Checkpointer.flow(checkpointSettings).shape()) .toInlet(maker.in1()); return FlowShape.of( breaker.in(), maker.out()); }) ) .map(Pair::first) ) .via(businessLogicFlow) .toMat(Sink.ignore(), Keep.both()) .run(actorSystem); // The stream will consume until there is a failure or the business logic completes the stream // To stop it cleanly: controlWithCompletion.first() .drainAndShutdown( controlWithCompletion.second(), actorSystem.dispatcher());
Consuming, producing, and checkpointing in one stream
The general approach for consuming, producing, and checkpointing in one stream is to:
- have a
Consumer.sourceWithCheckpointableContext
- transform the
EventData
into envelopes (usingProducerMessage.empty
to filter out events while propagating theCheckpointable
) - use a
Producer.flowWithContext
to produce events to the target hub - checkpoint the events with
Checkpointer.sinkWithCheckpointableContext
- Scala
-
source
val citiesToRegions = Map( "Stockholm" -> "Nordic", "Lausanne" -> "Central", "Nice" -> "Mediterranean", "Amsterdam" -> "Northwest", "Dublin" -> "Northwest", "Helsinki" -> "Nordic", "Prague" -> "Central", "Barcelona" -> "Mediterranean") val producerClient = ClientFromConfig.producer(producerConfig) val producerSettings = ProducerSettings(actorSystem) val checkpointStore = buildCheckpointStore(checkpointConfig) val eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig) val consumerSettings = ConsumerSettings(actorSystem).withConsumerGroup("eventhubs-example") val endToEndStream = Consumer .sourceWithCheckpointableContext( consumerSettings, eventProcessorBuilder, checkpointSettings, checkpointStore) .map { eventData => try { val count = eventData.getBodyAsString.toInt val region = citiesToRegions.getOrElse(eventData.getPartitionKey, "Unknown") ProducerMessage.singleWithPartitioning( new EventData(count.toString), ProducerMessage.partitionByKey(region)) } catch { case _: NumberFormatException => ProducerMessage.empty() } } .via(Producer.flowWithContext(producerSettings, producerClient)) .via(Checkpointer.flowWithCheckpointableContext(checkpointSettings)) .toMat(Sink.ignore)(Consumer.DrainingControl.apply) val control = endToEndStream.run()
- Java
-
source
ConcurrentHashMap<String, String> citiesToRegions = new ConcurrentHashMap<String, String>(); citiesToRegions.put("Stockholm", "Nordic"); citiesToRegions.put("Lausanne", "Central"); citiesToRegions.put("Nice", "Mediterranean"); citiesToRegions.put("Amsterdam", "Northwest"); citiesToRegions.put("Dublin", "Northwest"); citiesToRegions.put("Helsinki", "Nordic"); citiesToRegions.put("Prague", "Central"); citiesToRegions.put("Barcelona", "Mediterranean"); EventHubProducerAsyncClient producerClient = ClientFromConfig.producer(producerConfig); ProducerSettings producerSettings = ProducerSettings.create(actorSystem); CheckpointStore checkpointStore = buildCheckpointStore(checkpointConfig); EventProcessorClientBuilder eventProcessorBuilder = ClientFromConfig.processorClientBuilder(consumerConfig); ConsumerSettings consumerSettings = ConsumerSettings.create(actorSystem).withConsumerGroup("eventhubs-example"); RunnableGraph<Consumer.DrainingControl<Done>> endToEndStream = Consumer.sourceWithCheckpointableContext( consumerSettings, eventProcessorBuilder, checkpointSettings, checkpointStore) .map(eventData -> { try { // Parsing to validate int count = Integer.parseInt(eventData.getBodyAsString()); String region = citiesToRegions.get(eventData.getPartitionKey()); region = (region == null) ? "Unknown" : region; return ProducerMessage.singleWithPartitioning( new EventData(eventData.getBody()), ProducerMessage.partitionByKey(region)); } catch (NumberFormatException nfe) { return ProducerMessage.empty(); } }) .via(Producer.flowWithContext(producerSettings, producerClient)) .via(Checkpointer.flowWithCheckpointableContext(checkpointSettings)) .toMat(Sink.ignore(), Consumer::createDrainingControl); Consumer.DrainingControl<Done> control = endToEndStream.run(actorSystem);
Serialization
As with Alpakka Kafka, our firm recommendation is that serialization to and deserialization from byte arrays be performed in the stream (e.g. as map
stages), as this eases implementation of desired error handling strategies.