akka.persistence.cassandra.query.scaladsl
CassandraReadJournal
Companion object CassandraReadJournal
class CassandraReadJournal extends ReadJournal with PersistenceIdsQuery with CurrentPersistenceIdsQuery with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery with EventsByTagQuery with CurrentEventsByTagQuery
Scala API akka.persistence.query.scaladsl.ReadJournal
implementation for Cassandra.
It is retrieved with:
val queries = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
Corresponding Java API is in akka.persistence.cassandra.query.javadsl.CassandraReadJournal.
Configuration settings can be defined in the configuration section with the
absolute path corresponding to the identifier, which is "akka.persistence.cassandra.query"
for the default CassandraReadJournal#Identifier. See reference.conf
.
- Alphabetic
- By Inheritance
- CassandraReadJournal
- CurrentEventsByTagQuery
- EventsByTagQuery
- CurrentEventsByPersistenceIdQuery
- EventsByPersistenceIdQuery
- CurrentPersistenceIdsQuery
- PersistenceIdsQuery
- ReadJournal
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Instance Constructors
- new CassandraReadJournal(system: ExtendedActorSystem, cfg: Config, cfgPath: String)
- new CassandraReadJournal(system: ExtendedActorSystem, sharedConfig: Config, sharedConfigPath: String, viaNormalConstructor: Boolean)
- Attributes
- protected
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- def currentEventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed]
Same type of query as
eventsByPersistenceId
but the event stream is completed immediately when it reaches the end of the "result set".Same type of query as
eventsByPersistenceId
but the event stream is completed immediately when it reaches the end of the "result set". Events that are stored after the query is completed are not included in the event stream.- Definition Classes
- CassandraReadJournal → CurrentEventsByPersistenceIdQuery
- def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
Same type of query as
eventsByTag
but the event stream is completed immediately when it reaches the end of the "result set".Same type of query as
eventsByTag
but the event stream is completed immediately when it reaches the end of the "result set". Events that are stored after the query is completed are not included in the event stream.Use
NoOffset
when you want all events from the beginning of time. To acquire an offset from a long unix timestamp to use with this query, you can use timeBasedUUIDFrom.- Definition Classes
- CassandraReadJournal → CurrentEventsByTagQuery
- def currentPersistenceIds(): Source[String, NotUsed]
Same type of query as
persistenceIds
but the event stream is completed immediately when it reaches the end of the "result set".Same type of query as
persistenceIds
but the event stream is completed immediately when it reaches the end of the "result set". Events that are stored after the query is completed are not included in the event stream.- Definition Classes
- CassandraReadJournal → CurrentPersistenceIdsQuery
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def eventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed]
eventsByPersistenceId
is used to retrieve a stream of events for a particular persistenceId.eventsByPersistenceId
is used to retrieve a stream of events for a particular persistenceId.The
EventEnvelope
contains the event and providespersistenceId
andsequenceNr
for each event. ThesequenceNr
is the sequence number for the persistent actor with thepersistenceId
that persisted the event. ThepersistenceId
+sequenceNr
is an unique identifier for the event.fromSequenceNr
andtoSequenceNr
can be specified to limit the set of returned events. ThefromSequenceNr
andtoSequenceNr
are inclusive.The
EventEnvelope
also provides anoffset
, which is the same kind of offset as is used in theeventsByTag
query. TheOffset
type isakka.persistence.query.TimeBasedUUID
.The returned event stream is ordered by
sequenceNr
.Deleted events are also deleted from the event stream.
The stream is not completed when it reaches the end of the currently stored events, but it continues to push new events when new events are persisted. Corresponding query that is completed when it reaches the end of the currently stored events is provided by
currentEventsByPersistenceId
.- Definition Classes
- CassandraReadJournal → EventsByPersistenceIdQuery
- def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
eventsByTag
is used for retrieving events that were marked with a given tag, e.g.eventsByTag
is used for retrieving events that were marked with a given tag, e.g. all events of an Aggregate Root type.To tag events you create an
akka.persistence.journal.EventAdapter
that wraps the events in aakka.persistence.journal.Tagged
with the giventags
. The tags must be defined in thetags
section of theakka.persistence.cassandra
configuration.You can use NoOffset to retrieve all events with a given tag or retrieve a subset of all events by specifying a
TimeBasedUUID
offset
.The offset of each event is provided in the streamed envelopes returned, which makes it possible to resume the stream at a later point from a given offset. The
offset
parameter is exclusive, i.e. the event corresponding to the givenoffset
parameter is not included in the stream. TheOffset
type isakka.persistence.query.TimeBasedUUID
.For querying events that happened after a long unix timestamp you can use timeBasedUUIDFrom to create the offset to use with this method.
In addition to the
offset
the envelope also providespersistenceId
andsequenceNr
for each event. ThesequenceNr
is the sequence number for the persistent actor with thepersistenceId
that persisted the event. ThepersistenceId
+sequenceNr
is an unique identifier for the event.The returned event stream is ordered by the offset (timestamp), which corresponds to the same order as the write journal stored the events, with inaccuracy due to clock skew between different nodes. The same stream elements (in same order) are returned for multiple executions of the query on a best effort basis. The query is using a batched writes to a separate table so is eventually consistent. This means that different queries may see different events for the latest events, but eventually the result will be ordered by timestamp (Cassandra timeuuid column).
However a strong guarantee is provided that events for a given persistenceId will be delivered in order, the eventual consistency is only for ordering of events from different persistenceIds.
The stream is not completed when it reaches the end of the currently stored events, but it continues to push new events when new events are persisted. Corresponding query that is completed when it reaches the end of the currently stored events is provided by currentEventsByTag.
The stream is completed with failure if there is a failure in executing the query in the backend journal.
- Definition Classes
- CassandraReadJournal → EventsByTagQuery
- val firstOffset: UUID
Use this as the UUID offset in
eventsByTag
queries when you want all events from the beginning of time. - final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def initialize(): Future[Done]
Initialize connection to Cassandra and prepared statements.
Initialize connection to Cassandra and prepared statements. It is not required to do this and it will happen lazily otherwise. It is also not required to wait until this Future is complete to start using the read journal.
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- def offsetUuid(timestamp: Long): UUID
Create a time based UUID that can be used as offset in
eventsByTag
queries.Create a time based UUID that can be used as offset in
eventsByTag
queries. Thetimestamp
is a unix timestamp (as returned bySystem#currentTimeMillis
). - def persistenceIds(): Source[String, NotUsed]
allPersistenceIds
is used to retrieve a stream ofpersistenceId
s.allPersistenceIds
is used to retrieve a stream ofpersistenceId
s.The stream emits
persistenceId
strings.The stream guarantees that a
persistenceId
is only emitted once and there are no duplicates. Order is not defined. Multiple executions of the same stream (even bounded) may emit different sequence ofpersistenceId
s.The stream is not completed when it reaches the end of the currently known
persistenceId
s, but it continues to push newpersistenceId
s when new events are persisted. Corresponding query that is completed when it reaches the end of the currently knownpersistenceId
s is provided bycurrentPersistenceIds
.- Definition Classes
- CassandraReadJournal → PersistenceIdsQuery
- val session: CassandraSession
Data Access Object for arbitrary queries or updates.
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def timeBasedUUIDFrom(timestamp: Long): Offset
Create a time based UUID that can be used as offset in
eventsByTag
queries.Create a time based UUID that can be used as offset in
eventsByTag
queries. Thetimestamp
is a unix timestamp (as returned bySystem#currentTimeMillis
). - def timestampFrom(offset: TimeBasedUUID): Long
Convert a
TimeBasedUUID
to a unix timestamp (as returned bySystem#currentTimeMillis
). - def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()