Class ProducerController$
- java.lang.Object
-
- akka.actor.typed.delivery.ProducerController$
-
public class ProducerController$ extends java.lang.Object
Point-to-point reliable delivery between a single producer actor sending messages and a single consumer actor receiving the messages. Used together withConsumerController
.The producer actor will start the flow by sending a
ProducerController.Start
message to theProducerController
. TheActorRef
in theStart
message is typically constructed as a message adapter to map theProducerController.RequestNext
to the protocol of the producer actor.For the
ProducerController
to know where to send the messages it must be connected with theConsumerController
. You do this is withProducerController.RegisterConsumer
orConsumerController.RegisterToProducerController
messages.The
ProducerController
sendsRequestNext
to the producer, which is then allowed to send one message to theProducerController
via thesendNextTo
in theRequestNext
. Thereafter the producer will receive a newRequestNext
when it's allowed to send one more message.The producer and
ProducerController
actors are supposed to be local so that these messages are fast and not lost. This is enforced by a runtime check.Many unconfirmed messages can be in flight between the
ProducerController
andConsumerController
. The flow control is driven by the consumer side, which means that theProducerController
will not send faster than the demand requested by theConsumerController
.Lost messages are detected, resent and deduplicated if needed. This is also driven by the consumer side, which means that the
ProducerController
will not push resends unless requested by theConsumerController
.Until sent messages have been confirmed the
ProducerController
keeps them in memory to be able to resend them. If the JVM of theProducerController
crashes those unconfirmed messages are lost. To make sure the messages can be delivered also in that scenario theProducerController
can be used with aDurableProducerQueue
. Then the unconfirmed messages are stored in a durable way so that they can be redelivered when the producer is started again. An implementation of theDurableProducerQueue
is provided byEventSourcedProducerQueue
inakka-persistence-typed
.Instead of using
tell
with thesendNextTo
in theRequestNext
the producer can usecontext.ask
with theaskNextTo
in theRequestNext
. The difference is that a reply is sent back when the message has been handled. If aDurableProducerQueue
is used then the reply is sent when the message has been stored successfully, but it might not have been processed by the consumer yet. Otherwise the reply is sent after the consumer has processed and confirmed the message.If the consumer crashes a new
ConsumerController
can be connected to the originalProducerConsumer
without restarting it. TheProducerConsumer
will then redeliver all unconfirmed messages.It's also possible to use the
ProducerController
andConsumerController
without resending lost messages, but the flow control is still used. This can for example be useful when both consumer and producer are know to be located in the same localActorSystem
. This can be more efficient since messages don't have to be kept in memory in theProducerController
until they have been confirmed, but the drawback is that lost messages will not be delivered. See configurationonly-flow-control
of theConsumerController
.The
producerId
is used in logging and included as MDC entry with key"producerId"
. It's propagated to theConsumerController
and is useful for correlating log messages. It can be anyString
but it's recommended to use a unique identifier of representing the producer.If the
DurableProducerQueue
is defined it is created as a child actor of theProducerController
actor. It will use the same dispatcher as the parentProducerController
.
-
-
Field Summary
Fields Modifier and Type Field Description static ProducerController$
MODULE$
Static reference to the singleton instance of this Scala object.
-
Constructor Summary
Constructors Constructor Description ProducerController$()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description <A> Behavior<ProducerController.Command<A>>
apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2)
<A> Behavior<ProducerController.Command<A>>
apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1)
<A> Behavior<ProducerController.Command<A>>
create(java.lang.Class<A> messageClass, java.lang.String producerId, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior)
Java API<A> Behavior<ProducerController.Command<A>>
create(java.lang.Class<A> messageClass, java.lang.String producerId, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings)
Java API<A> java.lang.Class<ProducerController.RequestNext<A>>
requestNextClass()
Java API: The genericClass
type forProducerController.RequestNext
that can be used when creating amessageAdapter
forClass
.>
-
-
-
Field Detail
-
MODULE$
public static final ProducerController$ MODULE$
Static reference to the singleton instance of this Scala object.
-
-
Method Detail
-
requestNextClass
public <A> java.lang.Class<ProducerController.RequestNext<A>> requestNextClass()
Java API: The genericClass
type forProducerController.RequestNext
that can be used when creating amessageAdapter
forClass
.>
-
apply
public <A> Behavior<ProducerController.Command<A>> apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1)
-
apply
public <A> Behavior<ProducerController.Command<A>> apply(java.lang.String producerId, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2)
-
create
public <A> Behavior<ProducerController.Command<A>> create(java.lang.Class<A> messageClass, java.lang.String producerId, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior)
Java API
-
create
public <A> Behavior<ProducerController.Command<A>> create(java.lang.Class<A> messageClass, java.lang.String producerId, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, ProducerController.Settings settings)
Java API
-
-