Use Kafka between two services
This guide describes how to use Kafka for communication between 2 services. It assumes that you created the project using the Implementing Microservices with Akka tutorial, and it describes the changes required to replace the Projection over gRPC used for Service-to-service Eventing that is used in the tutorial to make the Shopping Cart events available to the Analytics Service. It focuses on the Kafka producer in the PublishEventsProjection
and the Kafka consumer in ShoppingAnalyticsService
. On this page you will learn how to:
-
send messages to a Kafka topic from a Projection
-
consume messages from a Kafka topic
Source downloads
If you prefer to simply view and run the example, download a zip file containing the completed code:
- Java
- Scala
Producer side
We will need to change our shopping-cart-service
to send the events to a Kafka topic instead of making them available over gRPC. We will use the same public external representation for the events specified in shopping_cart_events.proto
and write them to a Kafka topic.
Send to Kafka from a Projection
We will use the Alpakka Kafka connector to send the events to Kafka. The Alpakka project implements a stream-aware and reactive integration pipelines for Java and Scala. It is built on top of Akka Streams, and has been designed from the ground up to understand streaming natively and provide a DSL for reactive and stream-oriented programming, with built-in support for backpressure.
-
Start by adding the required dependency on Alpakka Kafka:
pom.xml:<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream-kafka_${scala.binary.version}</artifactId> </dependency>
build.sbt:val AlpakkaKafkaVersion = "7.0.0" libraryDependencies ++= Seq( // ... "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion
-
Add a
PublishEventsProjectionFlow
class that creates aFlowWithContext
for processing and converting the events:
- Java
-
src/main/java/shopping/cart/PublishEventsProjectionFlow.java:
package shopping.cart; import akka.Done; import akka.NotUsed; import akka.actor.typed.ActorSystem; import akka.kafka.ProducerMessage; import akka.kafka.ProducerSettings; import akka.kafka.javadsl.Producer; import akka.persistence.query.typed.EventEnvelope; import akka.projection.ProjectionContext; import akka.stream.javadsl.FlowWithContext; import com.google.protobuf.Any; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; public final class PublishEventsProjectionFlow { public static FlowWithContext< EventEnvelope<ShoppingCart.Event>, ProjectionContext, Done, ProjectionContext, NotUsed> createProducerFlow(String topic, ActorSystem<?> system) { var producerSettings = ProducerSettings.create(system, new StringSerializer(), new ByteArraySerializer()); (1) return FlowWithContext.<EventEnvelope<ShoppingCart.Event>, ProjectionContext>create() .map( envelope -> { ShoppingCart.Event event = envelope.event(); // using the cartId as the key and `DefaultPartitioner` will select partition based on // the key so that events for same cart always ends up in same partition String key = event.cartId(); var producerRecord = new ProducerRecord<>(topic, key, serialize(event)); (2) system.log().info("Publishing event [{}] to topic {}", envelope.event(), topic); return ProducerMessage.single(producerRecord); }) .via(Producer.flowWithContext(producerSettings)) .map(__ -> Done.getInstance()); } private static byte[] serialize(ShoppingCart.Event event) { var pbEvent = switch (event) { case ShoppingCart.ItemAdded(String cartId, String itemId, int qtd) -> shopping.cart.proto.ItemAdded.newBuilder() .setCartId(cartId) .setItemId(itemId) .setQuantity(qtd) .build(); case ShoppingCart.CheckedOut(String cartId, var __) -> shopping.cart.proto.CheckedOut.newBuilder().setCartId(cartId).build(); case null -> throw new IllegalArgumentException("Unknown event type: " + event.getClass()); }; // pack in Any so that type information is included for deserialization return Any.pack(pbEvent).toByteArray(); (3) } }
1 Using a Producer.flowWithContext
from Alpakka Kafka.2 The events are serialized to Protobuf and sent to the given topic. 3 Wrap in Protobuf Any
to include type information. - Scala
-
src/main/scala/shopping/cart/PublishEventsProjectionFlow.scala:
package shopping.cart import akka.Done import akka.NotUsed import akka.actor.typed.ActorSystem import akka.kafka.ProducerMessage import akka.kafka.ProducerSettings import akka.kafka.scaladsl.Producer import akka.persistence.query.typed.EventEnvelope import akka.projection.ProjectionContext import akka.stream.scaladsl.FlowWithContext import com.google.protobuf.any.{ Any => ScalaPBAny } import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.serialization.StringSerializer import org.slf4j.LoggerFactory object PublishEventsProjectionFlow { private val log = LoggerFactory.getLogger(getClass) def createProducerFlow( topic: String, system: ActorSystem[_]): FlowWithContext[ EventEnvelope[ShoppingCart.Event], ProjectionContext, Done, ProjectionContext, NotUsed] = { val producerSettings = ProducerSettings(system, new StringSerializer, new ByteArraySerializer) (1) FlowWithContext[EventEnvelope[ShoppingCart.Event], ProjectionContext] .map(envelope => { // using the cartId as the key and `DefaultPartitioner` will select partition based on the key // so that events for same cart always ends up in same partition val key = envelope.event.cartId val record = new ProducerRecord(topic, key, serialize(envelope.event)) (2) log.info("Publishing event [{}] to topic {}", envelope.event, topic) ProducerMessage.single(record) }) .via(Producer.flowWithContext(producerSettings)) .map(_ => Done) } private def serialize(event: ShoppingCart.Event): Array[Byte] = { val protoMessage = event match { case ShoppingCart.ItemAdded(cartId, itemId, quantity) => proto.ItemAdded(cartId, itemId, quantity) case ShoppingCart.CheckedOut(cartId, _) => proto.CheckedOut(cartId) } // pack in Any so that type information is included for deserialization ScalaPBAny.pack(protoMessage).toByteArray (3) } }
1 Using a Producer.flowWithContext
from Alpakka Kafka.2 The events are serialized to Protobuf and sent to the given topic. 3 Wrap in Protobuf Any
to include type information.
The serialization converts the ShoppingCart.Event
classes to the Protobuf representation. Since several types of messages are sent to the same topic we must include some type information that the consumers of the topic can use when deserializing the messages. Protobuf provides a built-in type called Any
for this purpose. That is why it is wrapped with ScalaPBAny.pack
.
Initialize the Projection
Now, we will use a similar projection as before, but this time we will send the events to the recently created Kafka handler.
-
Place the initialization code of the Projection in a
PublishEventsProjection
object class:- Java
-
src/main/java/shopping/cart/PublishEventsProjection.java:
package shopping.cart; import static shopping.cart.PublishEventsProjectionFlow.createProducerFlow; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.japi.Pair; import akka.persistence.query.Offset; import akka.persistence.query.typed.EventEnvelope; import akka.persistence.r2dbc.query.javadsl.R2dbcReadJournal; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.AtLeastOnceFlowProjection; import akka.projection.javadsl.SourceProvider; import akka.projection.r2dbc.R2dbcProjectionSettings; import akka.projection.r2dbc.javadsl.R2dbcProjection; import java.util.List; import java.util.Optional; public class PublishEventsProjection { private PublishEventsProjection() {} public static void init(ActorSystem<?> system) { String topic = system.settings().config().getString("shopping-cart-service.kafka.topic"); ShardedDaemonProcess.get(system) .initWithContext( (1) ProjectionBehavior.Command.class, "PublishEventsProjection", 4, daemonContext -> { List<Pair<Integer, Integer>> sliceRanges = (2) EventSourcedProvider.sliceRanges( system, R2dbcReadJournal.Identifier(), daemonContext.totalProcesses()); Pair<Integer, Integer> sliceRange = sliceRanges.get(daemonContext.processNumber()); return ProjectionBehavior.create(createProjection(system, sliceRange, topic)); }, ShardedDaemonProcessSettings.create(system), Optional.of(ProjectionBehavior.stopMessage())); } private static AtLeastOnceFlowProjection<Offset, EventEnvelope<ShoppingCart.Event>> createProjection(ActorSystem<?> system, Pair<Integer, Integer> sliceRange, String topic) { int minSlice = sliceRange.first(); int maxSlice = sliceRange.second(); SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = (3) EventSourcedProvider.eventsBySlices( system, R2dbcReadJournal.Identifier(), (4) "ShoppingCart", minSlice, maxSlice); String slice = "carts-" + minSlice + "-" + maxSlice; Optional<R2dbcProjectionSettings> settings = Optional.empty(); return R2dbcProjection.atLeastOnceFlow( (5) ProjectionId.of("PublishEventsProjection", slice), settings, sourceProvider, createProducerFlow(topic, system), (6) system); } }
1 ShardedDaemonProcess
manages the Projection instances. It ensures the Projection instances are always running and distributes them over the nodes in the Akka Cluster.2 The slice ranges are calculated based on the number active Projection instances, given by daemonContext.totalProcesses()
. Then, the specific slice range for this instance is identified by itsprocessNumber
.3 The source of the Projection is EventSourcedProvider.eventsBySlices
with the selected slice range, defined by aminSlice
and amaxSlice
.4 Using the R2DBC event journal. 5 Using R2DBC for offset storage of the Projection using exactly-once
strategy. Offset and projected model will be persisted in the same transaction.6 Defining a Projection Handler
factory for the handler we wrote in the beginning of this part. - Scala
-
src/main/scala/shopping/cart/PublishEventsProjection.scala:
package shopping.cart import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.persistence.query.Offset import akka.persistence.query.typed.EventEnvelope import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.r2dbc.scaladsl.R2dbcProjection import akka.projection.scaladsl.SourceProvider import akka.projection.{ Projection, ProjectionBehavior, ProjectionId } object PublishEventsProjection { def init(system: ActorSystem[_]): Unit = { val topic = system.settings.config.getString("shopping-cart-service.kafka.topic") def sourceProvider(sliceRange: Range) : SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider (1) .eventsBySlices[ShoppingCart.Event]( system, readJournalPluginId = R2dbcReadJournal.Identifier, (2) "ShoppingCart", sliceRange.min, sliceRange.max) def projection( sliceRange: Range): Projection[EventEnvelope[ShoppingCart.Event]] = { val minSlice = sliceRange.min val maxSlice = sliceRange.max val projectionId = ProjectionId("PublishEventsProjection", s"carts-$minSlice-$maxSlice") R2dbcProjection.atLeastOnceFlow( (3) projectionId, settings = None, sourceProvider(sliceRange), handler = PublishEventsProjectionFlow.createProducerFlow(topic, system) (4) )(system) } ShardedDaemonProcess(system).initWithContext( (5) name = "PublishEventsProjection", initialNumberOfInstances = 4, behaviorFactory = { daemonContext => val sliceRanges = (6) EventSourcedProvider.sliceRanges( system, R2dbcReadJournal.Identifier, daemonContext.totalProcesses) val sliceRange = sliceRanges(daemonContext.processNumber) ProjectionBehavior(projection(sliceRange)) }, ShardedDaemonProcessSettings(system), stopMessage = ProjectionBehavior.Stop) } }
1 The source of the Projection is EventSourcedProvider.eventsBySlices
with the selected slice range, defined by aminSlice
and amaxSlice
.2 Using the R2DBC event journal. 3 Using R2DBC for offset storage of the Projection using exactly-once
strategy. Offset and projected model will be persisted in the same transaction.4 Defining a Projection Handler
factory for the handler we wrote in the beginning of this part.5 ShardedDaemonProcess
manages the Projection instances. It ensures the Projection instances are always running and distributes them over the nodes in the Akka Cluster.6 The slice ranges are calculated based on the number active Projection instances, given by daemonContext.totalProcesses()
. Then, the specific slice range for this instance is identified by itsprocessNumber
.
The
SendProducer
is initialized using some configuration that we need to add. It defines how to connect to the Kafka broker. -
Add the following to a new
src/main/resources/kafka.conf
file:shopping-cart-service { kafka.topic = "shopping-cart-events" } # common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients kafka-connection-settings { # This and other connection settings may have to be changed depending on environment. bootstrap.servers = "localhost:9092" } akka.kafka.producer { kafka-clients = ${kafka-connection-settings} } akka.kafka.consumer { kafka-clients = ${kafka-connection-settings} }
-
Include
kafka.conf
inapplication.conf
. -
For local development add the following to
src/main/resources/local-shared.conf
, which is loaded when running locally:# common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients kafka-connection-settings { bootstrap.servers = "localhost:9092" } akka.kafka.producer { kafka-clients = ${kafka-connection-settings} } akka.kafka.consumer { kafka-clients = ${kafka-connection-settings} }
-
Call
PublishEventsProjection.init
fromMain
:- Java
-
PublishEventsProjection.init(system);
- Scala
-
PublishEventsProjection.init(system)
In this scenario, the Projection over gRPC is not used anymore. You can remove it by deleting PublishEventsGrpc , removing the initialization code from Main and the associated parameter in ShoppingCartServer .
|
Consume the events
We need to update our shopping-analytics-service
to consume the events from the Kafka topic instead.
The service will still receive the events in the Protobuf format defined in the shopping_cart_events.proto
from the shopping-cart-service
so we can keep it.
Different services should not share code, but we can copy the Protobuf specification since that is the published interface of the service. |
To add the service, follow these steps:
-
Open the
shopping-analytics-service
in IntelliJ just as you did with the shopping-cart-service. -
Copy the
shopping_cart_events.proto
from theshopping-cart-service
to theshopping-analytics-service/src/main/protobuf
and generate code by compiling the project:sbt compile
mvn compile
-
Modify the
ShoppingCartEventConsumer
object class inshopping-analytics-service
. It should run an Akka Stream with a KafkaConsumer.committableSource
from Alpakka Kafka.- Java
-
src/main/java/shopping/analytics/ShoppingCartEventConsumer.java:
package shopping.analytics; import akka.Done; import akka.actor.typed.ActorSystem; import akka.kafka.CommitterSettings; import akka.kafka.ConsumerSettings; import akka.kafka.Subscriptions; import akka.kafka.javadsl.Committer; import akka.kafka.javadsl.Consumer; import akka.stream.RestartSettings; import akka.stream.javadsl.RestartSource; import com.google.protobuf.Any; import com.google.protobuf.CodedInputStream; import com.google.protobuf.InvalidProtocolBufferException; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.proto.CheckedOut; import shopping.cart.proto.ItemAdded; import shopping.cart.proto.ItemQuantityAdjusted; import shopping.cart.proto.ItemRemoved; class ShoppingCartEventConsumer { private static final Logger log = LoggerFactory.getLogger(ShoppingCartEventConsumer.class); static void init(ActorSystem<?> system) { String topic = system .settings() .config() .getString("shopping-analytics-service.shopping-cart-kafka-topic"); ConsumerSettings<String, byte[]> consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new ByteArrayDeserializer()) .withGroupId("shopping-cart-analytics"); CommitterSettings committerSettings = CommitterSettings.create(system); Duration minBackoff = Duration.ofSeconds(1); Duration maxBackoff = Duration.ofSeconds(30); double randomFactor = 0.1; RestartSource (1) .onFailuresWithBackoff( RestartSettings.create(minBackoff, maxBackoff, randomFactor), () -> { return Consumer.committableSource( consumerSettings, Subscriptions.topics(topic)) (2) .mapAsync( 1, msg -> handleRecord(msg.record()).thenApply(done -> msg.committableOffset())) .via(Committer.flow(committerSettings)); (3) }) .run(system); } private static CompletionStage<Done> handleRecord(ConsumerRecord<String, byte[]> record) throws InvalidProtocolBufferException { byte[] bytes = record.value(); Any x = Any.parseFrom(bytes); (4) String typeUrl = x.getTypeUrl(); CodedInputStream inputBytes = x.getValue().newCodedInput(); try { switch (typeUrl) { case "shopping-cart-service/shoppingcart.ItemAdded": { ItemAdded event = ItemAdded.parseFrom(inputBytes); log.info( "ItemAdded: {} {} to cart {}", event.getQuantity(), event.getItemId(), event.getCartId()); break; } case "shopping-cart-service/shoppingcart.CheckedOut": { CheckedOut event = CheckedOut.parseFrom(inputBytes); log.info("CheckedOut: cart {} checked out", event.getCartId()); break; } default: throw new IllegalArgumentException("unknown record type " + typeUrl); } } catch (Exception e) { log.error("Could not process event of type [{}]", typeUrl, e); // continue with next } return CompletableFuture.completedFuture(Done.getInstance()); } }
1 RestartSource
will restart the stream in case of failures.2 Kafka Consumer stream. 3 Offset is committed to Kafka when records have been processed. 4 Protobuf Any
for type information. - Scala
-
src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala:
package shopping.analytics import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.Done import akka.actor.typed.ActorSystem import akka.kafka.CommitterSettings import akka.kafka.ConsumerSettings import akka.kafka.Subscriptions import akka.kafka.scaladsl.{ Committer, Consumer } import akka.stream.RestartSettings import akka.stream.scaladsl.RestartSource import com.google.protobuf.any.{ Any => ScalaPBAny } import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.serialization.StringDeserializer import org.slf4j.LoggerFactory import shopping.cart.proto object ShoppingCartEventConsumer { private val log = LoggerFactory.getLogger("shopping.analytics.ShoppingCartEventConsumer") def init(system: ActorSystem[_]): Unit = { implicit val sys: ActorSystem[_] = system implicit val ec: ExecutionContext = system.executionContext val topic = system.settings.config .getString("shopping-analytics-service.shopping-cart-kafka-topic") val consumerSettings = ConsumerSettings( system, new StringDeserializer, new ByteArrayDeserializer).withGroupId("shopping-cart-analytics") val committerSettings = CommitterSettings(system) RestartSource (1) .onFailuresWithBackoff( RestartSettings( minBackoff = 1.second, maxBackoff = 30.seconds, randomFactor = 0.1)) { () => Consumer .committableSource( consumerSettings, Subscriptions.topics(topic) ) (2) .mapAsync(1) { msg => handleRecord(msg.record).map(_ => msg.committableOffset) } .via(Committer.flow(committerSettings)) (3) } .run() } private def handleRecord( record: ConsumerRecord[String, Array[Byte]]): Future[Done] = { val bytes = record.value() val x = ScalaPBAny.parseFrom(bytes) (4) val typeUrl = x.typeUrl try { val inputBytes = x.value.newCodedInput() val event = typeUrl match { case "shopping-cart-service/shoppingcart.ItemAdded" => proto.ItemAdded.parseFrom(inputBytes) case "shopping-cart-service/shoppingcart.CheckedOut" => proto.CheckedOut.parseFrom(inputBytes) case _ => throw new IllegalArgumentException( s"unknown record type [$typeUrl]") } event match { case proto.ItemAdded(cartId, itemId, quantity, _) => log.info("ItemAdded: {} {} to cart {}", quantity, itemId, cartId) case proto.CheckedOut(cartId, _) => log.info("CheckedOut: cart {} checked out", cartId) } Future.successful(Done) } catch { case NonFatal(e) => log.error("Could not process event of type [{}]", typeUrl, e) // continue with next Future.successful(Done) } } }
1 RestartSource
will restart the stream in case of failures.2 Kafka Consumer stream. 3 Offset is committed to Kafka when records have been processed. 4 Protobuf Any
for type information.
The deserialization is using the type information from the Protobuf Any to decide which type of event to deserialize.
|
Configuration
We need to add configuration to initialize the Consumer
and define how to connect to the Kafka broker.
Add the following to a new src/main/resources/kafka.conf
file in shopping-analytics-service
:
shopping-analytics-service {
shopping-cart-kafka-topic = "shopping-cart-events"
}
# common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients
kafka-connection-settings {
# This and other connection settings may have to be changed depending on environment.
bootstrap.servers = "localhost:9092"
}
akka.kafka.producer {
kafka-clients = ${kafka-connection-settings}
}
akka.kafka.consumer {
kafka-clients = ${kafka-connection-settings}
kafka-clients {
auto.offset.reset = "earliest"
}
}
Include kafka.conf
from application.conf
.
And for local development add the following to src/main/resources/local-shared.conf
, which is loaded when running locally:
shopping-analytics-service.kafka.bootstrap-servers = "localhost:9092"
Main
Edit the Main
class that is included from the template project. It should initialize the ActorSystem
and the ShoppingCartEventConsumer
like this:
- Java
-
package shopping.analytics; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.management.cluster.bootstrap.ClusterBootstrap; import akka.management.javadsl.AkkaManagement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); public static void main(String[] args) { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ShoppingAnalyticsService"); try { init(system); } catch (Exception e) { logger.error("Terminating due to initialization failure.", e); system.terminate(); } } public static void init(ActorSystem<Void> system) { AkkaManagement.get(system).start(); ClusterBootstrap.get(system).start(); ShoppingCartEventConsumer.init(system); } }
- Scala
-
package shopping.analytics import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.ActorSystem import akka.management.cluster.bootstrap.ClusterBootstrap import akka.management.scaladsl.AkkaManagement import org.slf4j.LoggerFactory import scala.util.control.NonFatal object Main { val logger = LoggerFactory.getLogger("shopping.analytics.Main") def main(args: Array[String]): Unit = { val system = ActorSystem[Nothing](Behaviors.empty, "ShoppingAnalyticsService") try { init(system) } catch { case NonFatal(e) => logger.error("Terminating due to initialization failure.", e) system.terminate() } } def init(system: ActorSystem[_]): Unit = { AkkaManagement(system).start() ClusterBootstrap(system).start() ShoppingCartEventConsumer.init(system) } }
Run locally
In addition to PostgreSQL we now also need Kafka. The docker-compose
script starts PostgreSQL and Kafka:
-
Start PostgreSQL and Kafka, unless it’s already running, from the
shopping-cart-service
:docker compose up -d
-
Run the
shopping-cart-service
with:# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
sbt -Dconfig.resource=local1.conf run
-
In another terminal, run the new
shopping-analytics-service
with:# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
sbt -Dconfig.resource=local1.conf run
Exercise the service
Use grpcurl
to exercise the service:
-
Start another terminal, and use
grpcurl
to add 1 pencil to a cart:grpcurl -d '{"cartId":"cart4", "itemId":"pencil", "quantity":1}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
-
Look at the log output in the terminal of the
shopping-analytics-service
. You should see the logging from theAddItem
, showing that the new service consumed the event from Kafka:ItemAdded: 1 pencil to cart cart4