Class ProducerControllerImpl$
- java.lang.Object
-
- akka.actor.typed.delivery.internal.ProducerControllerImpl$
-
public class ProducerControllerImpl$ extends java.lang.Object
INTERNAL API==== Design notes ====
The producer will start the flow by sending a
ProducerController.Start
message to theProducerController
with message adapter reference to convertProducerController.RequestNext
message. TheProducerController
sendsRequestNext
to the producer, which is then allowed to send one message to theProducerController
.The producer and
ProducerController
are supposed to be local so that these messages are fast and not lost.The
ProducerController
sends the first message to theConsumerController
without waiting for aRequest
from theConsumerController
. The main reason for this is that when used with Cluster Sharding the first message will typically create theConsumerController
. It's also a way to connect the ProducerController and ConsumerController in a dynamic way, for example when the ProducerController is replaced.When the first message is received by the
ConsumerController
it sends back the initialRequest
, with demand of how many messages it can accept.Apart from the first message the
ProducerController
will not send more messages than requested by theConsumerController
.When there is demand from the consumer side the
ProducerController
sendsRequestNext
to the actual producer, which is then allowed to send one more message.Each message is wrapped by the
ProducerController
inConsumerController.SequencedMessage
with a monotonically increasing sequence number without gaps, starting at 1.In other words, the "request" protocol to the application producer and consumer is one-by-one, but between the
ProducerController
andConsumerController
it's window of messages in flight.The
Request
message also contains aconfirmedSeqNr
that is the acknowledgement from the consumer that it has received and processed all messages up to that sequence number.The
ConsumerController
will sendProducerControllerImpl.Resend
if a lost message is detected and then theProducerController
will resend all messages from that sequence number. The producer keeps unconfirmed messages in a buffer to be able to resend them. The buffer size is limited by the request window size.The resending is optional, and the
ConsumerController
can be started withresendLost=false
to ignore lost messages, and then theProducerController
will not buffer unconfirmed messages. In that mode it provides only flow control but no reliable delivery.
-
-
Field Summary
Fields Modifier and Type Field Description static ProducerControllerImpl$
MODULE$
Static reference to the singleton instance of this Scala object.
-
Constructor Summary
Constructors Constructor Description ProducerControllerImpl$()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description <A> Behavior<ProducerController.Command<A>>
apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.Function1<ConsumerController.SequencedMessage<A>,scala.runtime.BoxedUnit> send, scala.reflect.ClassTag<A> evidence$2)
For customsend
function.<A> Behavior<ProducerController.Command<A>>
apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$1)
<A> scala.collection.immutable.Seq<akka.actor.typed.delivery.internal.ChunkedMessage>
createChunks(A m, int chunkSize, Serialization serialization)
void
enforceLocalProducer(ActorRef<?> ref)
-
-
-
Field Detail
-
MODULE$
public static final ProducerControllerImpl$ MODULE$
Static reference to the singleton instance of this Scala object.
-
-
Method Detail
-
apply
public <A> Behavior<ProducerController.Command<A>> apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$1)
-
apply
public <A> Behavior<ProducerController.Command<A>> apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.Function1<ConsumerController.SequencedMessage<A>,scala.runtime.BoxedUnit> send, scala.reflect.ClassTag<A> evidence$2)
For customsend
function. For example used with Sharding where the message must be wrapped inShardingEnvelope(SequencedMessage(msg))
.
-
enforceLocalProducer
public void enforceLocalProducer(ActorRef<?> ref)
-
createChunks
public <A> scala.collection.immutable.Seq<akka.actor.typed.delivery.internal.ChunkedMessage> createChunks(A m, int chunkSize, Serialization serialization)
-
-