object ShardingProducerController
Reliable delivery between a producer actor sending messages to sharded consumer actors receiving the messages.
The ShardingProducerController should be used together with ShardingConsumerController.
A producer can send messages via a ShardingProducerController to any ShardingConsumerController
identified by an entityId. A single ShardingProducerController per ActorSystem (node) can be
shared for sending to all entities of a certain entity type. No explicit registration is needed
between the ShardingConsumerController and ShardingProducerController.
The producer actor will start the flow by sending a ShardingProducerController.Start
message to the ShardingProducerController. The ActorRef in the Start message is
typically constructed as a message adapter to map the ShardingProducerController.RequestNext
to the protocol of the producer actor.
The ShardingProducerController sends RequestNext to the producer, which is then allowed
to send one message to the ShardingProducerController via the sendNextTo in the RequestNext.
Thereafter the producer will receive a new RequestNext when it's allowed to send one more message.
In the RequestNext message there is information about which entities that have demand. It is allowed
to send to a new entityId that is not included in the RequestNext.entitiesWithDemand. If sending to
an entity that doesn't have demand the message will be buffered. This support for buffering means that
it is even allowed to send several messages in response to one RequestNext but it's recommended to
only send one message and wait for next RequestNext before sending more messages.
The producer and ShardingProducerController actors are supposed to be local so that these messages are
fast and not lost. This is enforced by a runtime check.
There will be one ShardingConsumerController for each entity. Many unconfirmed messages can be in
flight between the ShardingProducerController and each ShardingConsumerController. The flow control
is driven by the consumer side, which means that the ShardingProducerController will not send faster
than the demand requested by the consumers.
Lost messages are detected, resent and deduplicated if needed. This is also driven by the consumer side,
which means that the ShardingProducerController will not push resends unless requested by the
ShardingConsumerController.
Until sent messages have been confirmed the ShardingProducerController keeps them in memory to be able to
resend them. If the JVM of the ShardingProducerController crashes those unconfirmed messages are lost.
To make sure the messages can be delivered also in that scenario the ShardingProducerController can be
used with a DurableProducerQueue. Then the unconfirmed messages are stored in a durable way so
that they can be redelivered when the producer is started again. An implementation of the
DurableProducerQueue is provided by EventSourcedProducerQueue in akka-persistence-typed.
Instead of using tell with the sendNextTo in the RequestNext the producer can use context.ask
with the askNextTo in the RequestNext. The difference is that a reply is sent back when the
message has been handled. If a DurableProducerQueue is used then the reply is sent when the message
has been stored successfully, but it might not have been processed by the consumer yet. Otherwise the
reply is sent after the consumer has processed and confirmed the message.
It's also possible to use the ShardingProducerController and ShardingConsumerController without resending
lost messages, but the flow control is still used. This can be more efficient since messages
don't have to be kept in memory in the ProducerController until they have been
confirmed, but the drawback is that lost messages will not be delivered. See configuration
only-flow-control of the ShardingConsumerController.
The producerId is used in logging and included as MDC entry with key "producerId". It's propagated
to the ConsumerController and is useful for correlating log messages. It can be any String but it's
recommended to use a unique identifier of representing the producer.
If the DurableProducerQueue is defined it is created as a child actor of the ShardingProducerController actor.
ProducerController actors are created for each destination entity. Those child actors use the same dispatcher
as the parent ShardingProducerController.
- Annotations
- @ApiMayChange()
- Source
- ShardingProducerController.scala
- Alphabetic
- By Inheritance
- ShardingProducerController
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- sealed trait Command[A] extends UnsealedInternalCommand
- type EntityId = String
- final case class MessageWithConfirmation[A](entityId: EntityId, message: A, replyTo: ActorRef[Done]) extends UnsealedInternalCommand with Product with Serializable
For sending confirmation message back to the producer when the message has been confirmed.
For sending confirmation message back to the producer when the message has been confirmed. Typically used with
context.askfrom the producer.If
DurableProducerQueueis used the confirmation reply is sent when the message has been successfully stored, meaning that the actual delivery to the consumer may happen later. IfDurableProducerQueueis not used the confirmation reply is sent when the message has been fully delivered, processed, and confirmed by the consumer. - final case class RequestNext[A](sendNextTo: ActorRef[ShardingEnvelope[A]], askNextTo: ActorRef[MessageWithConfirmation[A]], entitiesWithDemand: Set[EntityId], bufferedForEntitiesWithoutDemand: Map[EntityId, Int]) extends Product with Serializable
The
ProducerControllersendsRequestNextto the producer when it is allowed to send one message via thesendNextTooraskNextTo.The
ProducerControllersendsRequestNextto the producer when it is allowed to send one message via thesendNextTooraskNextTo. It should wait for nextRequestNextbefore sending one more message.entitiesWithDemandcontains information about which entities that have demand. It is allowed to send to a newentityIdthat is not included in theentitiesWithDemand. If sending to an entity that doesn't have demand the message will be buffered, and that can be seen in thebufferedForEntitiesWithoutDemand.This support for buffering means that it is even allowed to send several messages in response to one
RequestNextbut it's recommended to only send one message and wait for nextRequestNextbefore sending more messages. - final class Settings extends AnyRef
- final case class Start[A](producer: ActorRef[RequestNext[A]]) extends Command[A] with Product with Serializable
Initial message from the producer actor.
Initial message from the producer actor. The
produceris typically constructed as a message adapter to map the RequestNext to the protocol of the producer actor.If the producer is restarted it should send a new
Startmessage to theShardingProducerController.
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def apply[A](producerId: String, region: ActorRef[ShardingEnvelope[SequencedMessage[A]]], durableQueueBehavior: Option[Behavior[actor.typed.delivery.DurableProducerQueue.Command[A]]], settings: Settings)(implicit arg0: ClassTag[A]): Behavior[Command[A]]
- def apply[A](producerId: String, region: ActorRef[ShardingEnvelope[SequencedMessage[A]]], durableQueueBehavior: Option[Behavior[actor.typed.delivery.DurableProducerQueue.Command[A]]])(implicit arg0: ClassTag[A]): Behavior[Command[A]]
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @HotSpotIntrinsicCandidate() @native()
- def create[A](messageClass: Class[A], producerId: String, region: ActorRef[ShardingEnvelope[SequencedMessage[A]]], durableQueueBehavior: Optional[Behavior[actor.typed.delivery.DurableProducerQueue.Command[A]]], settings: Settings): Behavior[Command[A]]
Java API
- def create[A](messageClass: Class[A], producerId: String, region: ActorRef[ShardingEnvelope[SequencedMessage[A]]], durableQueueBehavior: Optional[Behavior[actor.typed.delivery.DurableProducerQueue.Command[A]]]): Behavior[Command[A]]
Java API
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @HotSpotIntrinsicCandidate() @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @HotSpotIntrinsicCandidate() @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @HotSpotIntrinsicCandidate() @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @HotSpotIntrinsicCandidate() @native()
- def requestNextClass[A](): Class[RequestNext[A]]
Java API: The generic
Classtype forShardingProducerController.RequestNextthat can be used when creating amessageAdapterforClass<RequestNext<MessageType>>. - final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- object Settings