Class ShardingProducerController
- java.lang.Object
-
- akka.cluster.sharding.typed.delivery.ShardingProducerController
-
public class ShardingProducerController extends java.lang.Object
Reliable delivery between a producer actor sending messages to sharded consumer actors receiving the messages.The
ShardingProducerController
should be used together withShardingConsumerController
.A producer can send messages via a
ShardingProducerController
to anyShardingConsumerController
identified by anentityId
. A singleShardingProducerController
perActorSystem
(node) can be shared for sending to all entities of a certain entity type. No explicit registration is needed between theShardingConsumerController
andShardingProducerController
.The producer actor will start the flow by sending a
ShardingProducerController.Start
message to theShardingProducerController
. TheActorRef
in theStart
message is typically constructed as a message adapter to map theShardingProducerController.RequestNext
to the protocol of the producer actor.The
ShardingProducerController
sendsRequestNext
to the producer, which is then allowed to send one message to theShardingProducerController
via thesendNextTo
in theRequestNext
. Thereafter the producer will receive a newRequestNext
when it's allowed to send one more message.In the
RequestNext
message there is information about which entities that have demand. It is allowed to send to a newentityId
that is not included in theRequestNext.entitiesWithDemand
. If sending to an entity that doesn't have demand the message will be buffered. This support for buffering means that it is even allowed to send several messages in response to oneRequestNext
but it's recommended to only send one message and wait for nextRequestNext
before sending more messages.The producer and
ShardingProducerController
actors are supposed to be local so that these messages are fast and not lost. This is enforced by a runtime check.There will be one
ShardingConsumerController
for each entity. Many unconfirmed messages can be in flight between theShardingProducerController
and eachShardingConsumerController
. The flow control is driven by the consumer side, which means that theShardingProducerController
will not send faster than the demand requested by the consumers.Lost messages are detected, resent and deduplicated if needed. This is also driven by the consumer side, which means that the
ShardingProducerController
will not push resends unless requested by theShardingConsumerController
.Until sent messages have been confirmed the
ShardingProducerController
keeps them in memory to be able to resend them. If the JVM of theShardingProducerController
crashes those unconfirmed messages are lost. To make sure the messages can be delivered also in that scenario theShardingProducerController
can be used with aDurableProducerQueue
. Then the unconfirmed messages are stored in a durable way so that they can be redelivered when the producer is started again. An implementation of theDurableProducerQueue
is provided byEventSourcedProducerQueue
inakka-persistence-typed
.Instead of using
tell
with thesendNextTo
in theRequestNext
the producer can usecontext.ask
with theaskNextTo
in theRequestNext
. The difference is that a reply is sent back when the message has been handled. If aDurableProducerQueue
is used then the reply is sent when the message has been stored successfully, but it might not have been processed by the consumer yet. Otherwise the reply is sent after the consumer has processed and confirmed the message.It's also possible to use the
ShardingProducerController
andShardingConsumerController
without resending lost messages, but the flow control is still used. This can be more efficient since messages don't have to be kept in memory in theProducerController
until they have been confirmed, but the drawback is that lost messages will not be delivered. See configurationonly-flow-control
of theShardingConsumerController
.The
producerId
is used in logging and included as MDC entry with key"producerId"
. It's propagated to theConsumerController
and is useful for correlating log messages. It can be anyString
but it's recommended to use a unique identifier of representing the producer.If the
DurableProducerQueue
is defined it is created as a child actor of theShardingProducerController
actor.ProducerController
actors are created for each destination entity. Those child actors use the same dispatcher as the parentShardingProducerController
.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interface
ShardingProducerController.Command<A>
static class
ShardingProducerController.MessageWithConfirmation<A>
For sending confirmation message back to the producer when the message has been confirmed.static class
ShardingProducerController.MessageWithConfirmation$
static class
ShardingProducerController.RequestNext<A>
TheProducerController
sendsRequestNext
to the producer when it is allowed to send one message via thesendNextTo
oraskNextTo
.static class
ShardingProducerController.RequestNext$
static class
ShardingProducerController.Settings
static class
ShardingProducerController.Settings$
static class
ShardingProducerController.Start<A>
Initial message from the producer actor.static class
ShardingProducerController.Start$
-
Constructor Summary
Constructors Constructor Description ShardingProducerController()
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <A> Behavior<ShardingProducerController.Command<A>>
apply(java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ShardingProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2)
static <A> Behavior<ShardingProducerController.Command<A>>
apply(java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1)
static <A> Behavior<ShardingProducerController.Command<A>>
create(java.lang.Class<A> messageClass, java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior)
Java APIstatic <A> Behavior<ShardingProducerController.Command<A>>
create(java.lang.Class<A> messageClass, java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ShardingProducerController.Settings settings)
Java APIstatic <A> java.lang.Class<ShardingProducerController.RequestNext<A>>
requestNextClass()
Java API: The genericClass
type forShardingProducerController.RequestNext
that can be used when creating amessageAdapter
forClass
.>
-
-
-
Method Detail
-
requestNextClass
public static <A> java.lang.Class<ShardingProducerController.RequestNext<A>> requestNextClass()
Java API: The genericClass
type forShardingProducerController.RequestNext
that can be used when creating amessageAdapter
forClass
.>
-
apply
public static <A> Behavior<ShardingProducerController.Command<A>> apply(java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1)
-
apply
public static <A> Behavior<ShardingProducerController.Command<A>> apply(java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ShardingProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2)
-
create
public static <A> Behavior<ShardingProducerController.Command<A>> create(java.lang.Class<A> messageClass, java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior)
Java API
-
create
public static <A> Behavior<ShardingProducerController.Command<A>> create(java.lang.Class<A> messageClass, java.lang.String producerId, ActorRef<ShardingEnvelope<ConsumerController.SequencedMessage<A>>> region, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ShardingProducerController.Settings settings)
Java API
-
-