Class WorkPullingProducerController
- java.lang.Object
-
- akka.actor.typed.delivery.WorkPullingProducerController
-
public class WorkPullingProducerController extends java.lang.Object
Work pulling is a pattern where several worker actors pull tasks in their own pace from a shared work manager instead of that the manager pushes work to the workers blindly without knowing their individual capacity and current availability.The
WorkPullingProducerController
can be used together withConsumerController
to implement the work pulling pattern.One important property is that the order of the messages should not matter, because each message is routed randomly to one of the workers with demand. In other words, two subsequent messages may be routed to two different workers and processed independent of each other.
A worker actor (consumer) and its
ConsumerController
is dynamically registered to theWorkPullingProducerController
via aServiceKey
. It will register itself to the *Receptionist
, and theWorkPullingProducerController
subscribes to the same key to find active workers. In this way workers can be dynamically added or removed from any node in the cluster.The work manager (producer) actor will start the flow by sending a
WorkPullingProducerController.Start
message to theWorkPullingProducerController
. TheActorRef
in theStart
message is typically constructed as a message adapter to map theWorkPullingProducerController.RequestNext
to the protocol of the producer actor.The
WorkPullingProducerController
sendsRequestNext
to the producer, which is then allowed to send one message to theWorkPullingProducerController
via thesendNextTo
in theRequestNext
. Thereafter the producer will receive a newRequestNext
when it's allowed to send one more message. It will send a newRequestNext
when there are demand from any worker. It's possible that all workers with demand are deregistered after theRequestNext
is sent and before the actual messages is sent to theWorkPullingProducerController
. In that case the message is buffered and will be delivered when a new worker is registered or when there is new demand.The producer and
WorkPullingProducerController
actors are supposed to be local so that these messages are fast and not lost. This is enforced by a runtime check.Many unconfirmed messages can be in flight between the
WorkPullingProducerController
and eachConsumerController
. The flow control is driven by the consumer side, which means that theWorkPullingProducerController
will not send faster than the demand requested by the workers.Lost messages are detected, resent and deduplicated if needed. This is also driven by the consumer side, which means that the
WorkPullingProducerController
will not push resends unless requested by theConsumerController
.If a worker crashes or is stopped gracefully the unconfirmed messages for that worker will be routed to other workers by the
WorkPullingProducerController
. This may result in that some messages may be processed more than once, by different workers.Until sent messages have been confirmed the
WorkPullingProducerController
keeps them in memory to be able to resend them. If the JVM of theWorkPullingProducerController
crashes those unconfirmed messages are lost. To make sure the messages can be delivered also in that scenario theWorkPullingProducerController
can be used with aDurableProducerQueue
. Then the unconfirmed messages are stored in a durable way so that they can be redelivered when the producer is started again. An implementation of theDurableProducerQueue
is provided byEventSourcedProducerQueue
inakka-persistence-typed
.Instead of using
tell
with thesendNextTo
in theRequestNext
the producer can usecontext.ask
with theaskNextTo
in theRequestNext
. The difference is that a reply is sent back when the message has been handled. If aDurableProducerQueue
is used then the reply is sent when the message has been stored successfully, but it might not have been processed by the consumer yet. Otherwise the reply is sent after the consumer has processed and confirmed the message.It's also possible to use the
WorkPullingProducerController
andConsumerController
without resending lost messages, but the flow control is still used. This can for example be useful when both consumer and producer are know to be located in the same localActorSystem
. This can be more efficient since messages don't have to be kept in memory in theProducerController
until they have been confirmed, but the drawback is that lost messages will not be delivered. See configurationonly-flow-control
of theConsumerController
.The
producerId
is used in logging and included as MDC entry with key"producerId"
. It's propagated to theConsumerController
and is useful for correlating log messages. It can be anyString
but it's recommended to use a unique identifier of representing the producer.If the
DurableProducerQueue
is defined it is created as a child actor of theWorkPullingProducerController
actor.ProducerController
actors are created for each registered worker. Those child actors use the same dispatcher as the parentWorkPullingProducerController
.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interface
WorkPullingProducerController.Command<A>
static class
WorkPullingProducerController.GetWorkerStats<A>
Retrieve information about registered workers.static class
WorkPullingProducerController.GetWorkerStats$
static class
WorkPullingProducerController.MessageWithConfirmation<A>
For sending confirmation message back to the producer when the message has been fully delivered, processed, and confirmed by the consumer.static class
WorkPullingProducerController.MessageWithConfirmation$
static class
WorkPullingProducerController.RequestNext<A>
TheWorkPullingProducerController
sendsRequestNext
to the producer when it is allowed to send one message via thesendNextTo
oraskNextTo
.static class
WorkPullingProducerController.RequestNext$
static class
WorkPullingProducerController.Settings
static class
WorkPullingProducerController.Settings$
static class
WorkPullingProducerController.Start<A>
Initial message from the producer actor.static class
WorkPullingProducerController.Start$
static class
WorkPullingProducerController.WorkerStats
static class
WorkPullingProducerController.WorkerStats$
-
Constructor Summary
Constructors Constructor Description WorkPullingProducerController()
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <A> Behavior<WorkPullingProducerController.Command<A>>
apply(java.lang.Class<A> messageClass, java.lang.String producerId, ServiceKey<ConsumerController.Command<A>> workerServiceKey, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, WorkPullingProducerController.Settings settings)
Java APIstatic <A> Behavior<WorkPullingProducerController.Command<A>>
apply(java.lang.String producerId, ServiceKey<ConsumerController.Command<A>> workerServiceKey, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, WorkPullingProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2)
static <A> Behavior<WorkPullingProducerController.Command<A>>
apply(java.lang.String producerId, ServiceKey<ConsumerController.Command<A>> workerServiceKey, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1)
static <A> Behavior<WorkPullingProducerController.Command<A>>
create(java.lang.Class<A> messageClass, java.lang.String producerId, ServiceKey<ConsumerController.Command<A>> workerServiceKey, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior)
Java APIstatic <A> java.lang.Class<WorkPullingProducerController.RequestNext<A>>
requestNextClass()
Java API: The genericClass
type forWorkPullingProducerController.RequestNext
that can be used when creating amessageAdapter
forClass
.>
-
-
-
Method Detail
-
requestNextClass
public static <A> java.lang.Class<WorkPullingProducerController.RequestNext<A>> requestNextClass()
Java API: The genericClass
type forWorkPullingProducerController.RequestNext
that can be used when creating amessageAdapter
forClass
.>
-
apply
public static <A> Behavior<WorkPullingProducerController.Command<A>> apply(java.lang.String producerId, ServiceKey<ConsumerController.Command<A>> workerServiceKey, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, scala.reflect.ClassTag<A> evidence$1)
-
apply
public static <A> Behavior<WorkPullingProducerController.Command<A>> apply(java.lang.String producerId, ServiceKey<ConsumerController.Command<A>> workerServiceKey, scala.Option<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, WorkPullingProducerController.Settings settings, scala.reflect.ClassTag<A> evidence$2)
-
create
public static <A> Behavior<WorkPullingProducerController.Command<A>> create(java.lang.Class<A> messageClass, java.lang.String producerId, ServiceKey<ConsumerController.Command<A>> workerServiceKey, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior)
Java API
-
apply
public static <A> Behavior<WorkPullingProducerController.Command<A>> apply(java.lang.Class<A> messageClass, java.lang.String producerId, ServiceKey<ConsumerController.Command<A>> workerServiceKey, java.util.Optional<Behavior<DurableProducerQueue.Command<A>>> durableQueueBehavior, WorkPullingProducerController.Settings settings)
Java API
-
-