object ProducerControllerImpl
INTERNAL API
Design notes
The producer will start the flow by sending a ProducerController.Start message to the ProducerController
with
message adapter reference to convert ProducerController.RequestNext message.
The ProducerController
sends RequestNext
to the producer, which is then allowed to send one message to
the ProducerController
.
The producer and ProducerController
are supposed to be local so that these messages are fast and not lost.
The ProducerController
sends the first message to the ConsumerController
without waiting for
a Request
from the ConsumerController
. The main reason for this is that when used with
Cluster Sharding the first message will typically create the ConsumerController
. It's
also a way to connect the ProducerController and ConsumerController in a dynamic way, for
example when the ProducerController is replaced.
When the first message is received by the ConsumerController
it sends back the initial Request
,
with demand of how many messages it can accept.
Apart from the first message the ProducerController
will not send more messages than requested
by the ConsumerController
.
When there is demand from the consumer side the ProducerController
sends RequestNext
to the
actual producer, which is then allowed to send one more message.
Each message is wrapped by the ProducerController
in ConsumerController.SequencedMessage with
a monotonically increasing sequence number without gaps, starting at 1.
In other words, the "request" protocol to the application producer and consumer is one-by-one, but
between the ProducerController
and ConsumerController
it's window of messages in flight.
The Request
message also contains a confirmedSeqNr
that is the acknowledgement
from the consumer that it has received and processed all messages up to that sequence number.
The ConsumerController
will send ProducerControllerImpl.Resend if a lost message is detected
and then the ProducerController
will resend all messages from that sequence number. The producer keeps
unconfirmed messages in a buffer to be able to resend them. The buffer size is limited
by the request window size.
The resending is optional, and the ConsumerController
can be started with resendLost=false
to ignore lost messages, and then the ProducerController
will not buffer unconfirmed messages.
In that mode it provides only flow control but no reliable delivery.
- Alphabetic
- By Inheritance
- ProducerControllerImpl
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- final case class Ack(confirmedSeqNr: SeqNr) extends InternalCommand with DeliverySerializable with DeadLetterSuppression with Product with Serializable
- sealed trait InternalCommand extends AnyRef
- final case class Request(confirmedSeqNr: SeqNr, requestUpToSeqNr: SeqNr, supportResend: Boolean, viaTimeout: Boolean) extends InternalCommand with DeliverySerializable with DeadLetterSuppression with Product with Serializable
- final case class Resend(fromSeqNr: SeqNr) extends InternalCommand with DeliverySerializable with DeadLetterSuppression with Product with Serializable
- trait UnsealedInternalCommand extends InternalCommand
For commands defined in public ProducerController
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def apply[A](producerId: String, durableQueueBehavior: Option[Behavior[Command[A]]], settings: ProducerController.Settings, send: (SequencedMessage[A]) => Unit)(implicit arg0: ClassTag[A]): Behavior[Command[A]]
For custom
send
function.For custom
send
function. For example used with Sharding where the message must be wrapped inShardingEnvelope(SequencedMessage(msg))
. - def apply[A](producerId: String, durableQueueBehavior: Option[Behavior[Command[A]]], settings: ProducerController.Settings)(implicit arg0: ClassTag[A]): Behavior[Command[A]]
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @HotSpotIntrinsicCandidate() @native()
- def createChunks[A](m: A, chunkSize: Int, serialization: Serialization): Seq[ChunkedMessage]
- def enforceLocalProducer(ref: ActorRef[_]): Unit
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @HotSpotIntrinsicCandidate() @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @HotSpotIntrinsicCandidate() @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @HotSpotIntrinsicCandidate() @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @HotSpotIntrinsicCandidate() @native()
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- case object ResendFirstUnconfirmed extends InternalCommand with Product with Serializable