Class EventsBySliceFirehose$
- java.lang.Object
-
- akka.persistence.query.typed.internal.EventsBySliceFirehose$
-
- All Implemented Interfaces:
ExtensionId<akka.persistence.query.typed.internal.EventsBySliceFirehose>
,ExtensionIdProvider
public class EventsBySliceFirehose$ extends java.lang.Object implements ExtensionId<akka.persistence.query.typed.internal.EventsBySliceFirehose>, ExtensionIdProvider
INTERNAL APIThe purpose is to share the stream of events from the database and fan out to connected consumer streams. Thereby less queries and loading of events from the database. The shared stream is called the firehose stream.
The fan out of the firehose stream is via a
BroadcastHub
that consumer streams dynamically attach to.A new consumer starts a catchup stream since the start offset typically is behind the live firehose stream. In the beginning it will emit events only from the catchup stream. Offset progress for the firehose stream is tracked and when the catchup stream has caught up with the firehose stream it will switch over to emitting from firehose stream and close the catchup stream. During an overlap period of time it will use events from both catchup and firehose streams to make sure that no events are missed. During this overlap time there is best effort deduplication.
The
BroadcastHub
has a limited buffer that holds events between the slowest and fastest consumer. When the buffer is full the fastest consumer can't progress faster than the slowest. Short periods of slow down can be fine, but after a while the slow consumers are detected and aborted. They have to connect again and try catching up, but without slowing down other streams.The firehose stream is started on demand when the first consumer is attaching. It will be stopped when the last consumer is stopped, but it stays around for a while to make it more efficient for new or restarted consumers to attach again.
-
-
Field Summary
Fields Modifier and Type Field Description static EventsBySliceFirehose$
MODULE$
Static reference to the singleton instance of this Scala object.
-
Constructor Summary
Constructors Constructor Description EventsBySliceFirehose$()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description akka.persistence.query.typed.internal.EventsBySliceFirehose
createExtension(ExtendedActorSystem system)
Is used by Akka to instantiate the Extension identified by this ExtensionId, internal use only.akka.persistence.query.typed.internal.EventsBySliceFirehose
get(ActorSystem system)
Returns an instance of the extension identified by this ExtensionId instance.akka.persistence.query.typed.internal.EventsBySliceFirehose
get(ClassicActorSystemProvider system)
Returns an instance of the extension identified by this ExtensionId instance.boolean
isDurationGreaterThan(java.time.Instant from, java.time.Instant to, java.time.Duration duration)
EventsBySliceFirehose$
lookup()
Returns the canonical ExtensionId for this ExtensionTimestampOffset
timestampOffset(EventEnvelope<java.lang.Object> env)
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface akka.actor.ExtensionId
apply, apply, equals, hashCode
-
-
-
-
Field Detail
-
MODULE$
public static final EventsBySliceFirehose$ MODULE$
Static reference to the singleton instance of this Scala object.
-
-
Method Detail
-
get
public akka.persistence.query.typed.internal.EventsBySliceFirehose get(ActorSystem system)
Description copied from interface:ExtensionId
Returns an instance of the extension identified by this ExtensionId instance. Java API For extensions written in Scala that are to be used from Java also, this method should be overridden to get correct return type.override def get(system: ActorSystem): TheExtension = super.get(system)
- Specified by:
get
in interfaceExtensionId<akka.persistence.query.typed.internal.EventsBySliceFirehose>
-
get
public akka.persistence.query.typed.internal.EventsBySliceFirehose get(ClassicActorSystemProvider system)
Description copied from interface:ExtensionId
Returns an instance of the extension identified by this ExtensionId instance. Java API For extensions written in Scala that are to be used from Java also, this method should be overridden to get correct return type.override def get(system: ClassicActorSystemProvider): TheExtension = super.get(system)
- Specified by:
get
in interfaceExtensionId<akka.persistence.query.typed.internal.EventsBySliceFirehose>
-
lookup
public EventsBySliceFirehose$ lookup()
Description copied from interface:ExtensionIdProvider
Returns the canonical ExtensionId for this Extension- Specified by:
lookup
in interfaceExtensionIdProvider
-
createExtension
public akka.persistence.query.typed.internal.EventsBySliceFirehose createExtension(ExtendedActorSystem system)
Description copied from interface:ExtensionId
Is used by Akka to instantiate the Extension identified by this ExtensionId, internal use only.- Specified by:
createExtension
in interfaceExtensionId<akka.persistence.query.typed.internal.EventsBySliceFirehose>
-
timestampOffset
public TimestampOffset timestampOffset(EventEnvelope<java.lang.Object> env)
-
isDurationGreaterThan
public boolean isDurationGreaterThan(java.time.Instant from, java.time.Instant to, java.time.Duration duration)
-
-