Packages

  • package root
    Definition Classes
    root
  • package akka
    Definition Classes
    root
  • package actor
    Definition Classes
    akka
  • package typed
    Definition Classes
    actor
  • package delivery
    Definition Classes
    typed
  • object ProducerController

    Point-to-point reliable delivery between a single producer actor sending messages and a single consumer actor receiving the messages.

    Point-to-point reliable delivery between a single producer actor sending messages and a single consumer actor receiving the messages. Used together with ConsumerController.

    The producer actor will start the flow by sending a ProducerController.Start message to the ProducerController. The ActorRef in the Start message is typically constructed as a message adapter to map the ProducerController.RequestNext to the protocol of the producer actor.

    For the ProducerController to know where to send the messages it must be connected with the ConsumerController. You do this is with ProducerController.RegisterConsumer or ConsumerController.RegisterToProducerController messages.

    The ProducerController sends RequestNext to the producer, which is then allowed to send one message to the ProducerController via the sendNextTo in the RequestNext. Thereafter the producer will receive a new RequestNext when it's allowed to send one more message.

    The producer and ProducerController actors are supposed to be local so that these messages are fast and not lost. This is enforced by a runtime check.

    Many unconfirmed messages can be in flight between the ProducerController and ConsumerController. The flow control is driven by the consumer side, which means that the ProducerController will not send faster than the demand requested by the ConsumerController.

    Lost messages are detected, resent and deduplicated if needed. This is also driven by the consumer side, which means that the ProducerController will not push resends unless requested by the ConsumerController.

    Until sent messages have been confirmed the ProducerController keeps them in memory to be able to resend them. If the JVM of the ProducerController crashes those unconfirmed messages are lost. To make sure the messages can be delivered also in that scenario the ProducerController can be used with a DurableProducerQueue. Then the unconfirmed messages are stored in a durable way so that they can be redelivered when the producer is started again. An implementation of the DurableProducerQueue is provided by EventSourcedProducerQueue in akka-persistence-typed.

    Instead of using tell with the sendNextTo in the RequestNext the producer can use context.ask with the askNextTo in the RequestNext. The difference is that a reply is sent back when the message has been handled. If a DurableProducerQueue is used then the reply is sent when the message has been stored successfully, but it might not have been processed by the consumer yet. Otherwise the reply is sent after the consumer has processed and confirmed the message.

    If the consumer crashes a new ConsumerController can be connected to the original ProducerConsumer without restarting it. The ProducerConsumer will then redeliver all unconfirmed messages.

    It's also possible to use the ProducerController and ConsumerController without resending lost messages, but the flow control is still used. This can for example be useful when both consumer and producer are know to be located in the same local ActorSystem. This can be more efficient since messages don't have to be kept in memory in the ProducerController until they have been confirmed, but the drawback is that lost messages will not be delivered. See configuration only-flow-control of the ConsumerController.

    The producerId is used in logging and included as MDC entry with key "producerId". It's propagated to the ConsumerController and is useful for correlating log messages. It can be any String but it's recommended to use a unique identifier of representing the producer.

    If the DurableProducerQueue is defined it is created as a child actor of the ProducerController actor. It will use the same dispatcher as the parent ProducerController.

    Definition Classes
    delivery
    Annotations
    @ApiMayChange()
  • Command
  • MessageWithConfirmation
  • RegisterConsumer
  • RequestNext
  • Settings
  • Start

final class Settings extends AnyRef

Source
ProducerController.scala
Linear Supertypes
Type Hierarchy
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. Settings
  2. AnyRef
  3. Any
Implicitly
  1. by any2stringadd
  2. by StringFormat
  3. by Ensuring
  4. by ArrowAssoc
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. def +(other: String): String
    Implicit
    This member is added by an implicit conversion from Settings toany2stringadd[Settings] performed by method any2stringadd in scala.Predef.
    Definition Classes
    any2stringadd
  4. def ->[B](y: B): (Settings, B)
    Implicit
    This member is added by an implicit conversion from Settings toArrowAssoc[Settings] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc
    Annotations
    @inline()
  5. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  6. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  7. def clone(): AnyRef
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.CloneNotSupportedException]) @native() @HotSpotIntrinsicCandidate()
  8. val durableQueueRequestTimeout: FiniteDuration
  9. val durableQueueResendFirstInterval: FiniteDuration
  10. val durableQueueRetryAttempts: Int
  11. def ensuring(cond: (Settings) => Boolean, msg: => Any): Settings
    Implicit
    This member is added by an implicit conversion from Settings toEnsuring[Settings] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  12. def ensuring(cond: (Settings) => Boolean): Settings
    Implicit
    This member is added by an implicit conversion from Settings toEnsuring[Settings] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  13. def ensuring(cond: Boolean, msg: => Any): Settings
    Implicit
    This member is added by an implicit conversion from Settings toEnsuring[Settings] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  14. def ensuring(cond: Boolean): Settings
    Implicit
    This member is added by an implicit conversion from Settings toEnsuring[Settings] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  15. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  16. def equals(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef → Any
  17. def formatted(fmtstr: String): String
    Implicit
    This member is added by an implicit conversion from Settings toStringFormat[Settings] performed by method StringFormat in scala.Predef.
    Definition Classes
    StringFormat
    Annotations
    @inline()
  18. final def getClass(): Class[_ <: AnyRef]
    Definition Classes
    AnyRef → Any
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  19. def getDurableQueueRequestTimeout(): Duration

    Java API

  20. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  21. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  22. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  23. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  24. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  25. final def synchronized[T0](arg0: => T0): T0
    Definition Classes
    AnyRef
  26. def toString(): String
    Definition Classes
    Settings → AnyRef → Any
  27. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  28. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException]) @native()
  29. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  30. def withDurableQueueRequestTimeout(newDurableQueueRequestTimeout: Duration): Settings

    Java API

  31. def withDurableQueueRequestTimeout(newDurableQueueRequestTimeout: FiniteDuration): Settings

    Scala API

  32. def withDurableQueueResendFirstInterval(newDurableQueueResendFirstInterval: Duration): Settings

    Java API

  33. def withDurableQueueResendFirstInterval(newDurableQueueResendFirstInterval: FiniteDuration): Settings

    Scala API

  34. def withDurableQueueRetryAttempts(newDurableQueueRetryAttempts: Int): Settings

Deprecated Value Members

  1. def finalize(): Unit
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.Throwable]) @Deprecated @deprecated
    Deprecated

    (Since version ) see corresponding Javadoc for more information.

  2. def [B](y: B): (Settings, B)
    Implicit
    This member is added by an implicit conversion from Settings toArrowAssoc[Settings] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc
    Annotations
    @deprecated
    Deprecated

    (Since version 2.13.0) Use -> instead. If you still wish to display it as one character, consider using a font with programming ligatures such as Fira Code.

Inherited from AnyRef

Inherited from Any

Inherited by implicit conversion any2stringadd fromSettings to any2stringadd[Settings]

Inherited by implicit conversion StringFormat fromSettings to StringFormat[Settings]

Inherited by implicit conversion Ensuring fromSettings to Ensuring[Settings]

Inherited by implicit conversion ArrowAssoc fromSettings to ArrowAssoc[Settings]

Ungrouped