Use Kafka between two services
This guide describes how to use Kafka for communication between 2 services. It assumes that you created the project using the Implementing Microservices with Akka tutorial, and it describes the changes required to replace the Projection over gRPC used for Service-to-service Eventing that is used in the tutorial to make the Shopping Cart events available to the Analytics Service. It focuses on the Kafka producer in the PublishEventsProjection
and the Kafka consumer in ShoppingAnalyticsService
. On this page you will learn how to:
-
send messages to a Kafka topic from a Projection
-
consume messages from a Kafka topic
Producer side
We will need to change our shopping-cart-service
to send the events to a Kafka topic instead of making them available over gRPC. We will use the same public external representation for the events specified in shopping_cart_events.proto
and write them to a Kafka topic.
Send to Kafka from a Projection
We will use the Alpakka Kafka connector to send the events to Kafka. The Alpakka project implements a stream-aware and reactive integration pipelines for Java and Scala. It is built on top of Akka Streams, and has been designed from the ground up to understand streaming natively and provide a DSL for reactive and stream-oriented programming, with built-in support for backpressure.
-
Start by adding the required dependency on Alpakka Kafka:
pom.xml:<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream-kafka_${scala.binary.version}</artifactId> </dependency>
java -
Add a
PublishEventsProjectionFlow
class that creates aFlowWithContext
for processing and converting the events:
- Java
-
src/main/java/shopping/cart/PublishEventsProjectionFlow.java:
package shopping.cart; import akka.Done; import akka.NotUsed; import akka.actor.typed.ActorSystem; import akka.kafka.ProducerMessage; import akka.kafka.ProducerSettings; import akka.kafka.javadsl.Producer; import akka.persistence.query.typed.EventEnvelope; import akka.projection.ProjectionContext; import akka.stream.javadsl.FlowWithContext; import com.google.protobuf.Any; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; public final class PublishEventsProjectionFlow { public static FlowWithContext< EventEnvelope<ShoppingCart.Event>, ProjectionContext, Done, ProjectionContext, NotUsed> createProducerFlow(String topic, ActorSystem<?> system) { var producerSettings = ProducerSettings.create(system, new StringSerializer(), new ByteArraySerializer()); (1) return FlowWithContext.<EventEnvelope<ShoppingCart.Event>, ProjectionContext>create() .map( envelope -> { ShoppingCart.Event event = envelope.event(); // using the cartId as the key and `DefaultPartitioner` will select partition based on // the key so that events for same cart always ends up in same partition String key = event.cartId(); var producerRecord = new ProducerRecord<>(topic, key, serialize(event)); (2) system.log().info("Publishing event [{}] to topic {}", envelope.event(), topic); return ProducerMessage.single(producerRecord); }) .via(Producer.flowWithContext(producerSettings)) .map(__ -> Done.getInstance()); } private static byte[] serialize(ShoppingCart.Event event) { var pbEvent = switch (event) { case ShoppingCart.ItemAdded(String cartId, String itemId, int qtd) -> shopping.cart.proto.ItemAdded.newBuilder() .setCartId(cartId) .setItemId(itemId) .setQuantity(qtd) .build(); case ShoppingCart.CheckedOut(String cartId, var __) -> shopping.cart.proto.CheckedOut.newBuilder().setCartId(cartId).build(); case null -> throw new IllegalArgumentException("Unknown event type: " + event.getClass()); }; // pack in Any so that type information is included for deserialization return Any.pack(pbEvent).toByteArray(); (3) } }
java1 Using a Producer.flowWithContext
from Alpakka Kafka.2 The events are serialized to Protobuf and sent to the given topic. 3 Wrap in Protobuf Any
to include type information. - Scala
The serialization converts the ShoppingCart.Event
classes to the Protobuf representation. Since several types of messages are sent to the same topic we must include some type information that the consumers of the topic can use when deserializing the messages. Protobuf provides a built-in type called Any
for this purpose.
Initialize the Projection
Now, we will use a similar projection as before, but this time we will send the events to the recently created Kafka handler.
-
Place the initialization code of the Projection in a
PublishEventsProjection
class:- Java
-
src/main/java/shopping/cart/PublishEventsProjection.java:
package shopping.cart; import static shopping.cart.PublishEventsProjectionFlow.createProducerFlow; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.japi.Pair; import akka.persistence.query.Offset; import akka.persistence.query.typed.EventEnvelope; import akka.persistence.r2dbc.query.javadsl.R2dbcReadJournal; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.AtLeastOnceFlowProjection; import akka.projection.javadsl.SourceProvider; import akka.projection.r2dbc.R2dbcProjectionSettings; import akka.projection.r2dbc.javadsl.R2dbcProjection; import java.util.List; import java.util.Optional; public class PublishEventsProjection { private PublishEventsProjection() {} public static void init(ActorSystem<?> system) { String topic = system.settings().config().getString("shopping-cart-service.kafka.topic"); ShardedDaemonProcess.get(system) .initWithContext( (1) ProjectionBehavior.Command.class, "PublishEventsProjection", 4, daemonContext -> { List<Pair<Integer, Integer>> sliceRanges = (2) EventSourcedProvider.sliceRanges( system, R2dbcReadJournal.Identifier(), daemonContext.totalProcesses()); Pair<Integer, Integer> sliceRange = sliceRanges.get(daemonContext.processNumber()); return ProjectionBehavior.create(createProjection(system, sliceRange, topic)); }, ShardedDaemonProcessSettings.create(system), Optional.of(ProjectionBehavior.stopMessage())); } private static AtLeastOnceFlowProjection<Offset, EventEnvelope<ShoppingCart.Event>> createProjection(ActorSystem<?> system, Pair<Integer, Integer> sliceRange, String topic) { int minSlice = sliceRange.first(); int maxSlice = sliceRange.second(); SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = (3) EventSourcedProvider.eventsBySlices( system, R2dbcReadJournal.Identifier(), (4) "ShoppingCart", minSlice, maxSlice); String slice = "carts-" + minSlice + "-" + maxSlice; Optional<R2dbcProjectionSettings> settings = Optional.empty(); return R2dbcProjection.atLeastOnceFlow( (5) ProjectionId.of("PublishEventsProjection", slice), settings, sourceProvider, createProducerFlow(topic, system), (6) system); } }
java1 ShardedDaemonProcess
manages the Projection instances. It ensures the Projection instances are always running and distributes them over the nodes in the Akka Cluster.2 The slice ranges are calculated based on the number active Projection instances, given by daemonContext.totalProcesses()
. Then, the specific slice range for this instance is identified by itsprocessNumber
.3 The source of the Projection is EventSourcedProvider.eventsBySlices
with the selected slice range, defined by aminSlice
and amaxSlice
.4 Using the R2DBC event journal. 5 Using R2DBC for offset storage of the Projection using exactly-once
strategy. Offset and projected model will be persisted in the same transaction.6 Defining a Projection Handler
factory for the handler we wrote in the beginning of this part. - Scala
The
SendProducer
is initialized using some configuration that we need to add. It defines how to connect to the Kafka broker. -
Add the following to a new
src/main/resources/kafka.conf
file:shopping-cart-service { kafka.topic = "shopping-cart-events" } # common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients kafka-connection-settings { # This and other connection settings may have to be changed depending on environment. bootstrap.servers = "localhost:9092" } akka.kafka.producer { kafka-clients = ${kafka-connection-settings} } akka.kafka.consumer { kafka-clients = ${kafka-connection-settings} }
hocon -
Include
kafka.conf
inapplication.conf
. -
For local development add the following to
src/main/resources/local-shared.conf
, which is loaded when running locally:# common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients kafka-connection-settings { bootstrap.servers = "localhost:9092" } akka.kafka.producer { kafka-clients = ${kafka-connection-settings} } akka.kafka.consumer { kafka-clients = ${kafka-connection-settings} }
hocon -
Call
PublishEventsProjection.init
fromMain
:
In this scenario, the Projection over gRPC is not used anymore. You can remove it by deleting PublishEventsGrpc , removing the initialization code from Main and the associated parameter in ShoppingCartServer .
|
Consume the events
We need to update our shopping-analytics-service
to consume the events from the Kafka topic instead.
The service will still receive the events in the Protobuf format defined in the shopping_cart_events.proto
from the shopping-cart-service
so we can keep it.
Different services should not share code, but we can copy the Protobuf specification since that is the published interface of the service. |
To add the service, follow these steps:
-
Open the
shopping-analytics-service
in IntelliJ just as you did with the shopping-cart-service. -
Copy the
shopping_cart_events.proto
from theshopping-cart-service
to theshopping-analytics-service/src/main/protobuf
and generate code by compiling the project:mvn compile
shell script -
Modify the
ShoppingCartEventConsumer
class inshopping-analytics-service
. It should run an Akka Stream with a KafkaConsumer.committableSource
from Alpakka Kafka.- Java
-
src/main/java/shopping/analytics/ShoppingCartEventConsumer.java:
package shopping.analytics; import akka.Done; import akka.actor.typed.ActorSystem; import akka.kafka.CommitterSettings; import akka.kafka.ConsumerSettings; import akka.kafka.Subscriptions; import akka.kafka.javadsl.Committer; import akka.kafka.javadsl.Consumer; import akka.stream.RestartSettings; import akka.stream.javadsl.RestartSource; import com.google.protobuf.Any; import com.google.protobuf.CodedInputStream; import com.google.protobuf.InvalidProtocolBufferException; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.proto.CheckedOut; import shopping.cart.proto.ItemAdded; import shopping.cart.proto.ItemQuantityAdjusted; import shopping.cart.proto.ItemRemoved; class ShoppingCartEventConsumer { private static final Logger log = LoggerFactory.getLogger(ShoppingCartEventConsumer.class); static void init(ActorSystem<?> system) { String topic = system .settings() .config() .getString("shopping-analytics-service.shopping-cart-kafka-topic"); ConsumerSettings<String, byte[]> consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new ByteArrayDeserializer()) .withGroupId("shopping-cart-analytics"); CommitterSettings committerSettings = CommitterSettings.create(system); Duration minBackoff = Duration.ofSeconds(1); Duration maxBackoff = Duration.ofSeconds(30); double randomFactor = 0.1; RestartSource (1) .onFailuresWithBackoff( RestartSettings.create(minBackoff, maxBackoff, randomFactor), () -> { return Consumer.committableSource( consumerSettings, Subscriptions.topics(topic)) (2) .mapAsync( 1, msg -> handleRecord(msg.record()).thenApply(done -> msg.committableOffset())) .via(Committer.flow(committerSettings)); (3) }) .run(system); } private static CompletionStage<Done> handleRecord(ConsumerRecord<String, byte[]> record) throws InvalidProtocolBufferException { byte[] bytes = record.value(); Any x = Any.parseFrom(bytes); (4) String typeUrl = x.getTypeUrl(); CodedInputStream inputBytes = x.getValue().newCodedInput(); try { switch (typeUrl) { case "shopping-cart-service/shoppingcart.ItemAdded": { ItemAdded event = ItemAdded.parseFrom(inputBytes); log.info( "ItemAdded: {} {} to cart {}", event.getQuantity(), event.getItemId(), event.getCartId()); break; } case "shopping-cart-service/shoppingcart.CheckedOut": { CheckedOut event = CheckedOut.parseFrom(inputBytes); log.info("CheckedOut: cart {} checked out", event.getCartId()); break; } default: throw new IllegalArgumentException("unknown record type " + typeUrl); } } catch (Exception e) { log.error("Could not process event of type [{}]", typeUrl, e); // continue with next } return CompletableFuture.completedFuture(Done.getInstance()); } }
java1 RestartSource
will restart the stream in case of failures.2 Kafka Consumer stream. 3 Offset is committed to Kafka when records have been processed. 4 Protobuf Any
for type information. - Scala
The deserialization is using the type information from the Protobuf Any to decide which type of event to deserialize.
|
Configuration
We need to add configuration to initialize the Consumer
and define how to connect to the Kafka broker.
Add the following to a new src/main/resources/kafka.conf
file in shopping-analytics-service
:
shopping-analytics-service {
shopping-cart-kafka-topic = "shopping-cart-events"
}
# common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients
kafka-connection-settings {
# This and other connection settings may have to be changed depending on environment.
bootstrap.servers = "localhost:9092"
}
akka.kafka.producer {
kafka-clients = ${kafka-connection-settings}
}
akka.kafka.consumer {
kafka-clients = ${kafka-connection-settings}
kafka-clients {
auto.offset.reset = "earliest"
}
}
Include kafka.conf
from application.conf
.
And for local development add the following to src/main/resources/local-shared.conf
, which is loaded when running locally:
shopping-analytics-service.kafka.bootstrap-servers = "localhost:9092"
Main
Edit the Main
class that is included from the template project. It should initialize the ActorSystem
and the ShoppingCartEventConsumer
like this:
- Java
-
package shopping.analytics; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.management.cluster.bootstrap.ClusterBootstrap; import akka.management.javadsl.AkkaManagement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); public static void main(String[] args) { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ShoppingAnalyticsService"); try { init(system); } catch (Exception e) { logger.error("Terminating due to initialization failure.", e); system.terminate(); } } public static void init(ActorSystem<Void> system) { AkkaManagement.get(system).start(); ClusterBootstrap.get(system).start(); ShoppingCartEventConsumer.init(system); } }
java - Scala
Run locally
In addition to PostgreSQL we now also need Kafka. The docker-compose
script starts PostgreSQL and Kafka:
-
Start PostgreSQL and Kafka, unless it’s already running, from the
shopping-cart-service
:docker compose up -d
shell script -
Run the
shopping-cart-service
with:# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
shell script -
In another terminal, run the new
shopping-analytics-service
with:# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
shell script
Exercise the service
Use grpcurl
to exercise the service:
-
Start another terminal, and use
grpcurl
to add 1 pencil to a cart:grpcurl -d '{"cartId":"cart4", "itemId":"pencil", "quantity":1}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
shell script -
Look at the log output in the terminal of the
shopping-analytics-service
. You should see the logging from theAddItem
, showing that the new service consumed the event from Kafka:ItemAdded: 1 pencil to cart cart4