Transactions
Kafka Transactions provide guarantees that messages processed in a consume-transform-produce workflow (consumed from a source topic, transformed, and produced to a destination topic) are processed exactly once or not at all. This is achieved through coordination between the Kafka consumer group coordinator, transaction coordinator, and the consumer and producer clients used in the user application. The Kafka producer marks messages that are consumed from the source topic as “committed” only once the transformed messages are successfully produced to the sink.
For full details on how transactions are achieved in Kafka you may wish to review the Kafka Improvement Proposal KIP-98: Exactly Once Delivery and Transactional Messaging and its associated design document.
Transactional Source
The Transactional.source
Transactional.source
emits a ConsumerMessage.TransactionalMessage
ConsumerMessage.TransactionalMessage
which contains topic, partition, and offset information required by the producer during the commit process. Unlike with ConsumerMessage.CommittableMessage
ConsumerMessage.CommittableMessage
, the user is not responsible for committing transactions, this is handled by a Transactional.flow
Transactional.flow
or Transactional.sink
Transactional.sink
.
This source overrides the Kafka consumer property isolation.level
to read_committed
, so that only committed messages can be consumed.
A consumer group ID must be provided.
Only use this source if you have the intention to connect it to a Transactional.flow
Transactional.flow
or Transactional.sink
Transactional.sink
.
Transactional Sink and Flow
The Transactional.sink
Transactional.sink
is similar to the Producer.committableSink
Producer.committableSink
in that messages will be automatically committed as part of a transaction. The Transactional.flow
Transactional.flow
or Transactional.sink
Transactional.sink
are required when connecting a consumer to a producer to achieve a transactional workflow.
They override producer properties enable.idempotence
to true
and max.in.flight.requests.per.connection
to 1
as required by the Kafka producer to enable transactions. The transaction.timeout.ms
is set to 10s as recommended in KIP-447. In addition, you can optionally set akka.kafka.producer.transaction-id-prefix
to prefix in front of the generated transaction ID should your specifications require this level of control.
Consume-Transform-Produce Workflow
Kafka transactions are handled transparently to the user. The Transactional.source
Transactional.source
will enforce that a consumer group id is specified. All other Kafka consumer and producer properties required to enable transactions are overridden.
Transactions are committed on an interval which can be controlled with the producer config akka.kafka.producer.eos-commit-interval
, similar to how exactly once works with Kafka Streams. The default value is 100ms
. The larger commit interval is the more records will need to be reprocessed in the event of failure and the transaction is aborted.
When the stream is materialized and the producer sees the first message it will initialize a transaction. Every commit interval (eos-commit-interval
) we check if there are any offsets available to commit. If offsets exist then we suspend backpressured demand while we drain all outstanding messages that have not yet been successfully acknowledged (if any) and then commit the transaction. After the commit succeeds a new transaction is begun and we re-initialize demand for upstream messages.
Messages are also drained from the stream when the consumer gets a rebalance of partitions. In that case, the consumer will wait in the onPartitionsRevoked
callback until all of the messages have been drained from the stream and the transaction is committed before allowing the rebalance to continue. The amount of total time the consumer will wait for draining is controlled by the akka.kafka.consumer.commit-timeout
, and the interval between checks is controlled by the akka.kafka.consumer.eos-draining-check-interval
configuration settings.
To gracefully shutdown the stream and commit the current transaction you must call shutdown()
on the Consumer.Control
Consumer.Control
materialized value to await all produced message acknowledgements and commit the final transaction.
Simple Example
- Scala
-
source
val control = Transactional .source(consumerSettings, Subscriptions.topics(sourceTopic)) .via(businessFlow) .map { msg => ProducerMessage.single(new ProducerRecord(sinkTopic, msg.record.key, msg.record.value), msg.partitionOffset) } .toMat(Transactional.sink(producerSettings))(DrainingControl.apply) .run() // ... control.drainAndShutdown()
- Java
-
source
Consumer.DrainingControl<Done> control = Transactional.source(consumerSettings, Subscriptions.topics(sourceTopic)) .via(business()) .map( msg -> ProducerMessage.single( new ProducerRecord<>(targetTopic, msg.record().key(), msg.record().value()), msg.partitionOffset())) .toMat(Transactional.sink(producerSettings), Consumer::createDrainingControl) .run(system); // ... control.drainAndShutdown(ec);
Recovery From Failure
When any stage in the stream fails the whole stream will be torn down. In the general case it’s desirable to allow transient errors to fail the whole stream because they cannot be recovered from within the application. Transient errors can be caused by network partitions, Kafka broker failures, ProducerFencedException
’s from other application instances, and so on. When the stream encounters transient errors then the current transaction will be aborted before the stream is torn down. Any produced messages that were not committed will not be available to downstream consumers as long as those consumers are configured with isolation.level = read_committed
.
For transient errors we can choose to rely on the Kafka producer’s configuration to retry, or we can handle it ourselves at the Akka Streams or Application layer. Using the RestartSource we can backoff connection attempts so that we don’t hammer the Kafka cluster in a tight loop.
- Scala
-
source
val innerControl = new AtomicReference[Control](Consumer.NoopControl) val stream = RestartSource.onFailuresWithBackoff( RestartSettings( minBackoff = 1.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 ) ) { () => Transactional .source(consumerSettings, Subscriptions.topics(sourceTopic)) .via(businessFlow) .map { msg => ProducerMessage.single(new ProducerRecord(sinkTopic, msg.record.key, msg.record.value), msg.partitionOffset) } // side effect out the `Control` materialized value because it can't be propagated through the `RestartSource` .mapMaterializedValue(c => innerControl.set(c)) .via(Transactional.flow(producerSettings)) } stream.runWith(Sink.ignore) // Add shutdown hook to respond to SIGTERM and gracefully shutdown stream sys.ShutdownHookThread { Await.result(innerControl.get.shutdown(), 10.seconds) }
- Java
-
source
AtomicReference<Consumer.Control> innerControl = new AtomicReference<>(Consumer.createNoopControl()); Source<ProducerMessage.Results<String, String, ConsumerMessage.PartitionOffset>, NotUsed> stream = RestartSource.onFailuresWithBackoff( RestartSettings.create( java.time.Duration.ofSeconds(3), // min backoff java.time.Duration.ofSeconds(30), // max backoff 0.2), // adds 20% "noise" to vary the intervals slightly () -> Transactional.source(consumerSettings, Subscriptions.topics(sourceTopic)) .via(business()) .map( msg -> ProducerMessage.single( new ProducerRecord<>( targetTopic, msg.record().key(), msg.record().value()), msg.partitionOffset())) // side effect out the `Control` materialized value because it can't be // propagated through the `RestartSource` .mapMaterializedValue( control -> { innerControl.set(control); return control; }) .via(Transactional.flow(producerSettings))); CompletionStage<Done> streamCompletion = stream.runWith(Sink.ignore(), system); // Add shutdown hook to respond to SIGTERM and gracefully shutdown stream Runtime.getRuntime().addShutdownHook(new Thread(() -> innerControl.get().shutdown()));
Caveats
There are several scenarios that this library’s implementation of Kafka transactions does not automatically account for.
All of the scenarios covered in the At-Least-Once Delivery documentation (Multiple Effects per Commit, Non-Sequential Processing, and Conditional Message Processing) are applicable to transactional scenarios as well.
Any state in the transformation logic is not part of a transaction. It’s left to the user to rebuild state when applying stateful operations with transaction. It’s possible to encode state into messages produced to topics during a transaction. For example you could produce messages to a topic that represents an event log as part of a transaction. This event log can be replayed to reconstitute the correct state before the stateful stream resumes consuming again at startup.
Any side effects that occur in the transformation logic is not part of a transaction (i.e. writes to a database).
The exactly-once-semantics are guaranteed only when your flow consumes from and produces to the same Kafka cluster. Producing to partitions from a 3rd-party source or consuming partitions from one Kafka cluster and producing to another Kafka cluster are not supported.
Further Reading
For more information on exactly once and transactions in Kafka please consult the following resources.
- KIP-98: Exactly Once Delivery and Transactional Messaging (Design Document)
- KIP-129: Streams Exactly-Once Semantics (Design Document)
- KIP-447: EOS Scalability Design (Design Document)
- You Cannot Have Exactly-Once Delivery Redux by Tyler Treat
- Exactly-once Semantics are Possible: Here’s How Kafka Does it