public class NotificationReader
extends java.lang.Object
implements akka.actor.Actor, akka.actor.ActorLogging, akka.actor.Timers
Reads rows from the notification table and publish those to
interested subscribers, with an optional delay defined by publishDelay
and additionalRandomPublishDelay.
The notification table is structured in time buckets (e.g. 1 minute periods) to allow for efficient removal of old entries.
It starts at the given startTimestamp. It performs the queries periodically with the
readInterval. It keeps track of latest timestamp for the latest (current) bucket and
uses that in the query to only select newer. When no more events are found in current
time bucket it progress to next bucket (but not ahead of current time).
Replication of rows can be delayed and to find such delayed rows in current bucket and
old buckets it also performs so called backtracking queries. A backtracking query scans
all rows in a time bucket to find rows that have not been published previously.
It looks back for maxBacktrackingBuckets but scans only one bucket per tick.
It switches between backtracking queries and queries of latest rows in current bucket.
There is only one query in progress at a time.
| Constructor and Description |
|---|
NotificationReader(long startTimestamp,
NotificationSettings settings,
akka.persistence.cassandra.session.scaladsl.CassandraSession session,
java.lang.String readNotificationCql,
scala.Function2<java.lang.String,NotificationWriter.EventsWritten,scala.runtime.BoxedUnit> notificationCallback) |
| Modifier and Type | Method and Description |
|---|---|
protected void |
akka$actor$Actor$_setter_$context_$eq(akka.actor.ActorContext x$1) |
protected void |
akka$actor$Actor$_setter_$self_$eq(akka.actor.ActorRef x$1) |
akka.actor.ActorContext |
context() |
scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> |
idle() |
akka.event.LoggingAdapter |
log() |
static akka.actor.Props |
props(long startTimestamp,
NotificationSettings notificationConfig,
akka.persistence.cassandra.session.scaladsl.CassandraSession session,
java.lang.String readNotificationCql,
scala.Function2<java.lang.String,NotificationWriter.EventsWritten,scala.runtime.BoxedUnit> notificationCallback) |
scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> |
receive() |
akka.actor.ActorRef |
self() |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait$init$, akka$actor$ActorLogging$$_log_$eq, akka$actor$ActorLogging$$_logpublic NotificationReader(long startTimestamp,
NotificationSettings settings,
akka.persistence.cassandra.session.scaladsl.CassandraSession session,
java.lang.String readNotificationCql,
scala.Function2<java.lang.String,NotificationWriter.EventsWritten,scala.runtime.BoxedUnit> notificationCallback)
public static akka.actor.Props props(long startTimestamp,
NotificationSettings notificationConfig,
akka.persistence.cassandra.session.scaladsl.CassandraSession session,
java.lang.String readNotificationCql,
scala.Function2<java.lang.String,NotificationWriter.EventsWritten,scala.runtime.BoxedUnit> notificationCallback)
public akka.actor.ActorContext context()
context in interface akka.actor.Actorpublic final akka.actor.ActorRef self()
self in interface akka.actor.Actorprotected void akka$actor$Actor$_setter_$context_$eq(akka.actor.ActorContext x$1)
akka$actor$Actor$_setter_$context_$eq in interface akka.actor.Actorprotected final void akka$actor$Actor$_setter_$self_$eq(akka.actor.ActorRef x$1)
akka$actor$Actor$_setter_$self_$eq in interface akka.actor.Actorpublic akka.event.LoggingAdapter log()
log in interface akka.actor.ActorLoggingpublic scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> receive()
receive in interface akka.actor.Actorpublic scala.PartialFunction<java.lang.Object,scala.runtime.BoxedUnit> idle()