Configuration

Make your edits/overrides in your application.conf.

Default configuration

The reference configuration file with the default values:

source
# This configures the default settings for all Cassandra Journal plugin # instances in the system. If you are using just one configuration for # all persistent actors then you should point your akka.persistence.journal.plugin # setting to "akka.persistence.cassandra.journal". # # Otherwise you need to create differently named sections containing # only those settings that shall be different from the defaults # configured here, importing the defaults like so: # # another-cassandra-plugin = ${akka.persistence.cassandra} # another-cassandra-plugin { # journal { # <settings...> # } # query { # <settings...> # } # events-by-tag { # <settings...> # } # } akka.persistence.cassandra { //#shared # The implementation of `akka.stream.alpakka.cassandra.CqlSessionProvider` # used for creating the `CqlSession`. # It may optionally have a constructor with an `ActorSystem` and `Config` parameters. session-provider = "akka.stream.alpakka.cassandra.DefaultSessionProvider" # Configure Akka Discovery by setting a service name service-discovery { name = "" lookup-timeout = 1 s } # The ExecutionContext to use for the session tasks and future composition. session-dispatcher = "akka.persistence.cassandra.default-dispatcher" # Full config path to the Datastax Java driver's configuration section. # When connecting to more than one Cassandra cluster different session configuration can be # defined with this property. Different plugins can thereby connect to different Cassandra # clusters. # See https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/#quick-overview # and https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/ datastax-java-driver-config = "datastax-java-driver" # profile to use, can set separate ones for reads and writes # See https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/ # for overriding any settings read-profile = "akka-persistence-cassandra-profile" write-profile = "akka-persistence-cassandra-profile" # meta columns were added in version 0.55. If you don't alter existing messages table and still # use `tables-autocreate=on` you have to set this property to off. # When trying to create the materialized view with the meta columns before corresponding columns # have been added the messages table an exception "Undefined column name meta_ser_id" is raised, # because Cassandra validates the "CREATE MATERIALIZED VIEW IF NOT EXISTS" # even though the view already exists and will not be created. To work around that issue you can disable the # meta data columns in the materialized view by setting this property to off. meta-in-events-by-tag-view = on # If there is an unexpected error in the Cassandra Journal the journal shuts down # to prevent any data corruption. Set to true to also run coordinated shutdown so # that the application shuts down. Useful in environments where applicatons are # automatically shutdown or if the Cassandra Journal not working means the application # won't function. coordinated-shutdown-on-error = off compatibility { # Enable this when using CosmosDB cosmosdb = off } //#shared //#journal journal { # FQCN of the cassandra journal plugin class = "akka.persistence.cassandra.journal.CassandraJournal" # Dispatcher for the plugin actor plugin-dispatcher = "akka.persistence.cassandra.default-dispatcher" # Parameter indicating whether the journal keyspace should be auto created. # Not all Cassandra settings are configurable when using autocreate and for # full control of the keyspace and table definitions you should create them # manually (with a script). keyspace-autocreate = false # Parameter indicating whether the journal tables should be auto created # Not all Cassandra settings are configurable when using autocreate and for # full control of the keyspace and table definitions you should create them # manually (with a script). tables-autocreate = false # Name of the keyspace to be created/used by the journal keyspace = "akka" # Name of the table to be created/used by the journal. # If the table doesn't exist it is automatically created when tables-autocreate # is enabled. table = "messages" # Compaction strategy for the journal table. # Please refer to the tests for example configurations. # This is only used for auto-create of messages table, i.e. when tables-autocreate is # enabled and the table doesn't exist already. table-compaction-strategy { class = "SizeTieredCompactionStrategy" } # Name of the table to be created/used for storing metadata. # If the table doesn't exist it is automatically created when tables-autocreate # is enabled. metadata-table = "metadata" # Name of the table to be created/used for storing metadata. # If the table doesn't exist it is automatically created when tables-autocreate # is enabled. all-persistence-ids-table = "all_persistence_ids" # Replication strategy to use. SimpleStrategy or NetworkTopologyStrategy. # This is only used for auto-create of keyspace, i.e. when keyspace-autocreate is # enabled and the keyspace doesn't exist already. replication-strategy = "SimpleStrategy" # Replication factor to use when creating a keyspace. Is only used when replication-strategy is SimpleStrategy. # This is only used for auto-create of keyspace, i.e. when keyspace-autocreate is # enabled and the keyspace doesn't exist already. replication-factor = 1 # Replication factor list for data centers. Is only used when replication-strategy is NetworkTopologyStrategy. # The value can be either a proper list, e.g. ["dc1:3", "dc2:2"], # or a comma-separated list within a single string, e.g. "dc1:3,dc2:2". # This is only used for auto-create of keyspace, i.e. when keyspace-autocreate is # enabled and the keyspace doesn't exist already. data-center-replication-factors = [] # Maximum number of messages that will be batched when using `persistAsync`. # Also used as the max batch size for deletes. max-message-batch-size = 100 # Target number of entries per partition (= columns per row). # Must not be changed after table creation (currently not checked). # This is "target" as AtomicWrites that span partition boundaries will result in bigger partitions to ensure atomicity. target-partition-size = 500000 # The time to wait before cassandra will remove the tombstones created for deleted entries. # cfr. gc_grace_seconds table property documentation on # https://docs.datastax.com/en/cql-oss/3.1/cql/cql_reference/tabProp.html # This is only used for auto-create of messages table, i.e. when tables-autocreate is # enabled and the table doesn't exist already. gc-grace-seconds = 864000 # Akka-persistence allows multiple pending deletes for the same persistence id however this plugin only executes one # at a time per persistence id (deletes for different persistenceIds can happen concurrently). # # If multiple deletes for the same persistence id are received then they are queued. # # If the queue is full any subsequent deletes are failed immediately without attempting them in Cassandra. # # Deleting should be used with snapshots for efficiency for recovery thus deleting should be infrequent max-concurrent-deletes = 10 # For applications that are not deleting any events this can be set to 'off', which will optimize # the recovery to not query for highest deleted sequence number from the metadata table. # It must not be off if deletes of events are used or have been used previously. # If this is set to off then delete attempts will fail with an IllegalArgumentException. support-deletes = on # For applications that are not using persistenceIds or currentPersistenceIds queries # this can be set to 'off', which will optimize the write of the first event for each # persistent actor since the all_persistence_id table doesn't have to be populated. # Note that the Cleanup and Reconcilation tools may also use persistenceIds queries # and those will not work if this was disabled when the events were written. In # that case the all_persistence_id table can be reconstructed with # Reconcilation.rebuildAllPersistenceIds. support-all-persistence-ids = on } //#journal # This configures the default settings for all CassandraReadJournal plugin # instances in the system. # # Settings for eventsByTag are defined in akka.persistence.cassandra.events-by-tag section. # # If you use multiple plugin instances you need to create differently named # sections containing only those settings that shall be different from the defaults # configured here, importing the defaults like so: # # another-cassandra-plugin = ${akka.persistence.cassandra} # another-cassandra-plugin { # query { # <settings...> # } # events-by-tag { # <settings...> # } # } //#query query { # Implementation class of the Cassandra ReadJournalProvider class = "akka.persistence.cassandra.query.CassandraReadJournalProvider" # Dispatcher for the plugin actors. plugin-dispatcher = "akka.persistence.cassandra.default-dispatcher" read-profile = "akka-persistence-cassandra-profile" # New events are retrieved (polled) with this interval. refresh-interval = 3s # Sequence numbers for a persistenceId is assumed to be monotonically increasing # without gaps. That is used for detecting missing events. # In early versions of the journal that might not be true and therefore # this can be relaxed by setting this property to off. gap-free-sequence-numbers = on # When using LQ writing in one DC and querying in another, the events for an entity may # appear in the querying DC out of order, when that happens, try for this amount of # time to find the in-order sequence number before failing the stream events-by-persistence-id-gap-timeout = 10s # How many events to fetch in one query (replay) and keep buffered until they # are delivered downstreams. max-buffer-size = 500 # Deserialization of events is perfomed in an Akka streams mapAsync operator and this is the # parallelism for that. Increasing to means that deserialization is pipelined, which can # be an advantage for machines with many CPU cores but otherwise it might be slower because # of higher CPU saturation and more competing tasks when there are many concurrent queries or # replays. deserialization-parallelism = 1 } //#query //#events-by-tag events-by-tag { # Enable/disable events by tag. If eventsByTag queries aren't required then this should be set to # false to avoid the overhead of maintaining the tag_views table. enabled = true # Tagged events are written to a separate Cassandra table in unlogged batches # Max size of these batches. The best value for this will depend on the size of # the serialized events. Cassandra logs a warning for batches above a certain # size and this should be reduced if that warning is seen. max-message-batch-size = 150 # Max time to buffer events for before writing. # Larger values will increase cassandra write efficiency but increase the delay before # seeing events in EventsByTag queries. # Setting this to 0 means that tag writes will be written immediately. Batching will still happen # as events are buffered while a write is in progress flush-interval = 0ms # Tagged events are written to a separate table after the write to the messages table has completed. # If the write to the tag_views table fails it is retried. If it hasn't succeeded within this timeout # then the actor will be stopped and the write will be retried again to the tag_views table when the actor # is restarted # A default of 4 seconds as that is greater than a typical write timeout in C* (2 seconds) and less than # the default eventual consistency delay # This behavior is new in 1.0.4 where before the write to the tag_views was completely asynchronous. tag-write-timeout = 4s # Update the tag_scanning table with this interval. Shouldn't be done too often to # avoid unecessary load. The tag_scanning table keeps track of a starting point for tag # scanning during recovery of persistent actor. scanning-flush-interval = 30s # Enable DistributedPubSub to announce events for a specific tag have # been written. These announcements cause any ongoing getEventsByTag to immediately re-poll, rather than # wait. In order enable this feature, make the following settings: # # - enable clustering for your actor system # - akka.persistence.cassandra.pubsub-notification=on (send real-time announcements at most every sec) # # Setting pubsub-notification to "off" will disable the journal sending these announcements. # When enabled with `on` it will throttle the number of notfication messages per tag to at most 10 per second. # Alternatively a duration can be defined, such as pubsub-notification=300ms, which will be throttling interval # for sending the notifications. pubsub-notification = off table = "tag_views" # This is only used for auto-create of tag_views table. gc-grace-seconds = 864000 # This is only used for auto-create of tag_views table. compaction-strategy { class = "SizeTieredCompactionStrategy" # If setting a time-to-live then consider using TimeWindowCompactionStratery # See [here](http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html) for guideance. # It is reccommended not to have more than 50 buckets so this needs to be based on your # time-to-live e.g. if you set the TTL to 50 hours and the compaction window to 1 hour # there will be 50 buckets. # class = "TimeWindowCompactionStrategy" # compaction_window_unit = "HOURS" # compaction_window_size = 1 } # How long events are kept for in the tag_views table # By default the events are kept for ever. Uncomment and set to an appropriate # duration for your use case. See the compaction-strategy.class if you set this. # This is only used for auto-create of tag_views table. #time-to-live = 2d # WARNING: Can not be changed after data has been written # # Unless you have a significant (million+) of events for a single tag # do not change this to Minute. Each tag in the tag-views table has a partition # per tag per bucket # Valid options: Day, Hour, Minute bucket-size = "Hour" # Configure this to the first bucket eventByTag queries will start from in the format # yyyyMMddTHH:mm yyyyMMdd is also supported if using Day as a bucket size # Will be rounded down to the start of whatever time bucket it falls into # When NoOffset is used it will look for events from this day and forward. first-time-bucket = "20151120T00:00" # The actor responsible for writing a tag is stopped if the tag isn't used for this duration. stop-tag-writer-when-idle = 10s # How long to look for delayed events # This works by adding an additional (internal) sequence number to each tag / persistence id # event stream so that the read side can detect missing events. When a gap is detected no new events # are emitted from the stream until either the missing events are found or the timeout is reached # If the event is not found it is checked every `refresh-interval` so do not set this lower than that # if you want at least one retry # When looking for missing events only the current time bucket and the previous bucket are checked meaning that if # clocks are out of sync, or cassandra replication is out by more than your bucket size (minute, hour or day) # then the missing events won't be found gap-timeout = 10s # When a new persistenceId is found in an eventsByTag query that wasn't found in the initial offset scanning # period as it didn't have any events in the current time bucket, this is how long the stage will delay events # looking for any lower tag pid sequence nrs. 0s means that the found event is assumed to be the first. # The edge case is if events for a not previously seen persistenceId come out of order then if this is set to # 0s the newer event will be delivered and when the older event is found the stream will fail as events have # to be delivered in order. # By default this is disabled, but for low (< 1 s) eventual-consistency-delay it can be set to something like # 100ms for reduced risk of missing event for edge case when events are seen in "wrong" order. new-persistence-id-scan-timeout = 0s # For offset queries that start in the current time bucket a period of scanning # takes place before deliverying events to look for the lowest sequence number # for each persistenceId. Any value above 0 will result in at least one scan from # the offset to (offset + period). Larger values will result in a longer period of time # before the stream starts emitting events. offset-scanning-period = 200ms # Settings for retrying database queries. It can still be wrapped in an RestartSource # for full restarts on failure. Query retires are done within the stage because: # - Lighter weight than a full restart. A restart requires scanning for the current tag pid sequence numbers # - Restart typically starts from a later offset, if events have been delayed from before the offset they won't be delivered retries { # Max attemps to retry each query. Set to -1 for indefinite retry. # The default here is very large to avoid full restarts attempts = 100 # Min duration the query will back off for min-backoff = 1s # Max duration the query will back off for max-backoff = 30s # after calculation of the back-off an additional # random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. # In order to skip this additional delay pass in `0`. random-factor = 0.1 } //#backtrack back-track { # Interval at which events by tag stages trigger a query to look for delayed events from before the # current offset. Delayed events are also searched for when a subsequent event for the same persistence id # is received. The purpose of this back track is to find delayed events for persistence ids that are no # longer receiving events. Depending on the size of the period of the back track this can be expensive. interval = 1s # How far back to go in time for the scan. This is the maxinum amount of time an event can be delayed # and be found without receiving a new event for the same persistence id. Set to a duration or "max" to search back # to the start of the previous bucket which is the furthest a search can go back. period = 5s # at a less frequent interval for a longer back track is done to find any events that were delayed significantly long-interval = 120s # long-period can be max, off, or a duration where max is to the start of the previous bucket or cleanup-old-persistence-ids, # which ever is shorter. Back tracks can not be longer than the cleanup-old-persistence-ids otherwise old events # will be redelivered due to the metadat having been dropped long-period = "max" } //#backtrack # For eventsByTag queries how long to delay the query for by setting the upper bound of the event TimeUUID to be slightly in the past. # This is very important as event writes that come from different nodes # will have slightly different clocks meaning events aren't received in TimeUUID order in C*. # Keeping clocks in sync with ntp is not sufficient to prevent this as it only takes a milisecond. # # In addition events written from the same node in an unlogged batch, meant to be isolated in C*, have been # found in load testing to come partially to a concurrent query, and the partial results are not always the # ones with the lowest TimeUUID. # # Events coming out of order are detected for persitenceIds that the query has already seen by using the previous # tag pid sequence number. However for persistenceIds that the query has not seen the expecdted tag pid sequence number # is not known. # # Another reason this is important is if the events are delivered to the # query immediately and the offset is greater than some delayed events if the query is restarted from that offset # the delayed events will never be delivered. # # Setting this to anything lower than 2s is highly discouraged. eventual-consistency-delay = 5s # Verbose debug logging for offset updates for events by tag queries. Any logging that is per event is # affected by this flag. Debug logging that is per query to Cassandra is enabled if DEBUG logging is enabled verbose-debug-logging = false # Set to a duration to have events by tag queries drop state about persistence ids # this can be set to reduce memory usage of events by tag queries for use cases where there # are many short lived persistence ids # If the metadata for a persistence-id is dropped and the persistence-id is encoutered it will # add a delay of the new-persistence-id-scan-timeout before delivering any new events # This should be set to a large value e.g. 2 x the bucket size. It can be set lower but if the metadata is dropped # for a persistence id and an event is received afterwards then old events in the current and previous bucket may # be redelivered # By default it is set to 2 x the bucket size cleanup-old-persistence-ids = "<default>" # If more than max-missing-to-search events are detected missing by an events by tag query it fails # right away rather than searching for them. This is to prevent a missing search using a unbounded amount # of memory. This may be replaced in the future by an incremental missing search max-missing-to-search = 5000 } //#events-by-tag # This configures the default settings for all Cassandra Snapshot plugin # instances in the system. If you are using just one configuration for # all persistent actors then you should point your akka.persistence.snapshot-store.plugin # setting to this section. # # Otherwise you need to create differently named sections containing # only those settings that shall be different from the defaults # configured here, importing the defaults like so: # # another-cassandra-plugin = ${akka.persistence.cassandra} # another-cassandra-plugin { # snapshot { # <settings...> # } # } //#snapshot snapshot { # FQCN of the cassandra snapshot store plugin class = "akka.persistence.cassandra.snapshot.CassandraSnapshotStore" # Dispatcher for the plugin actor plugin-dispatcher = "akka.persistence.cassandra.default-dispatcher" write-profile = "akka-persistence-cassandra-snapshot-profile" read-profile = "akka-persistence-cassandra-snapshot-profile" # Parameter indicating whether the journal keyspace should be auto created. # Not all Cassandra settings are configurable when using autocreate and for # full control of the keyspace and table definitions you should create them # manually (with a script). keyspace-autocreate = false # Parameter indicating whether the journal tables should be auto created # Not all Cassandra settings are configurable when using autocreate and for # full control of the keyspace and table definitions you should create them # manually (with a script). tables-autocreate = false # Name of the keyspace to be created/used by the snapshot store keyspace = "akka_snapshot" # Name of the table to be created/used by the snapshot store. # If the table doesn't exist it is automatically created when tables-autocreate # is enabled. table = "snapshots" # Compaction strategy for the snapshot table # Please refer to the tests for example configurations. # This is only used for auto-create of snapshots table, i.e. when tables-autocreate is # enabled and the table doesn't exist already. table-compaction-strategy { class = "SizeTieredCompactionStrategy" } # Replication strategy to use. SimpleStrategy or NetworkTopologyStrategy. # This is only used for auto-create of keyspace, i.e. when keyspace-autocreate is # enabled and the keyspace doesn't exist already. replication-strategy = "SimpleStrategy" # Replication factor to use when creating a keyspace. Is only used when replication-strategy is SimpleStrategy. # This is only used for auto-create of keyspace, i.e. when keyspace-autocreate is # enabled and the keyspace doesn't exist already. replication-factor = 1 # Replication factor list for data centers, e.g. ["dc1:3", "dc2:2"]. Is only used when replication-strategy is NetworkTopologyStrategy. # This is only used for auto-create of keyspace, i.e. when keyspace-autocreate is # enabled and the keyspace doesn't exist already. data-center-replication-factors = [] # The time to wait before cassandra will remove the tombstones created for deleted entries. # cfr. gc_grace_seconds table property documentation on # http://www.datastax.com/documentation/cql/3.1/cql/cql_reference/tabProp.html # This is only used for auto-create of snapshots table, i.e. when tables-autocreate is # enabled and the table doesn't exist already. gc-grace-seconds = 864000 # Number load attempts when recovering from the latest snapshot fails # yet older snapshot files are available. Each recovery attempt will try # to recover using an older than previously failed-on snapshot file # (if any are present). If all attempts fail the recovery will fail and # the persistent actor will be stopped. max-load-attempts = 3 } //#snapshot # Configuration of the Cleanup tool. cleanup { # Full configuration path of the plugin to use. plugin-location = "akka.persistence.cassandra" # Timeout of individual delete operations. If it takes longer the whole job # will be aborted with logging of how far it reached. operation-timeout = 10s # Log progress after this number of delete operations. Can be set to 1 to log # progress of each operation. log-progress-every = 100 # By default no deletes are executed and are instead logged at INFO. Set this to true # to actually do the deletes dry-run = true } # Configuration of the Reconciler tool. reconciler { # Full configuration path of the plugin to use. plugin-location = "akka.persistence.cassandra" # profile to use, can set separate ones for reads and writes # See https://docs.datastax.com/en/developer/java-driver/4.3/manual/core/configuration/ # for overriding any settings read-profile = "akka-persistence-cassandra-profile" write-profile = "akka-persistence-cassandra-profile" } # Configuration of the Cassandra health check (compatible with Akka Management). healthcheck { # Full configuration path of the plugin to use. plugin-location = "akka.persistence.cassandra" # Timeout of the health check query execution. timeout = 500ms # CQL query that verifies the health of the database. # A query returning some result (even empty) is treated as a successful health check. # Failed query execution (connection error or client exception) is treated as a failed health check. health-check-cql = "SELECT now() FROM system.local" } # Default dispatcher for plugin actor and tasks. default-dispatcher { type = Dispatcher executor = "fork-join-executor" fork-join-executor { parallelism-min = 6 parallelism-factor = 1 parallelism-max = 6 } } } //#profile # See reference configuration at # https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/ # (check which exact version Akka Persistence Cassandra uses) datastax-java-driver { # always set this to allow reconnection on startup if cassandra is down # not overridiable profile so this plugin can't override it for you # advanced.reconnect-on-init = true profiles { akka-persistence-cassandra-profile { basic.request { consistency = QUORUM # the journal does not use any counters or collections default-idempotence = true } } akka-persistence-cassandra-snapshot-profile { basic.request { consistency = ONE # the snapshot store does not use any counters or collections default-idempotence = true } } } } //#profile

Journal configuration is under akka.persistence.cassandra.journal.

Snapshot configuration is under akka.persistence.cassandra.snapshot.

Query configuration is under akka.persistence.cassandra.query.

Events by tag configuration is under akka.persistence.cassandra.events-by-tag and shared b journal and query.

The settings that shared by the journal, query, and snapshot parts of the plugin and are under akka.persistence.cassandra.

Cassandra driver configuration

All Cassandra driver settings are via its standard profile mechanism.

One important setting is to configure the database driver to retry the initial connection:

datastax-java-driver.advanced.reconnect-on-init = true

It is not enabled automatically as it is in the driver’s reference.conf and is not overridable in a profile.

If the ip addresses of your cassandra nodes might change (e.g. if you use k8s) then

datastax-java-driver.advanced.resolve-contact-points = false

should also be set (resolves a dns address again when new connections are created). This also implies disabling java’s dns cache with -Dnetworkaddress.cache.ttl=0.

Cassandra driver overrides

source# See reference configuration at
# https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/
# (check which exact version Akka Persistence Cassandra uses)
datastax-java-driver {

  # always set this to allow reconnection on startup if cassandra is down
  # not overridiable profile so this plugin can't override it for you
  # advanced.reconnect-on-init = true

  profiles {
    akka-persistence-cassandra-profile {
      basic.request {
        consistency = QUORUM
        # the journal does not use any counters or collections
        default-idempotence = true
      }
    }
    akka-persistence-cassandra-snapshot-profile {
      basic.request {
        consistency = ONE
        # the snapshot store does not use any counters or collections
        default-idempotence = true
      }
    }
  }
}

Contact points configuration

The Cassandra server contact points can be defined with the Cassandra driver configuration

datastax-java-driver {
  basic.contact-points = ["127.0.0.1:9042"]
  basic.load-balancing-policy.local-datacenter = "datacenter1"
}

Alternatively, Akka Discovery can be used for finding the Cassandra server contact points as described in the Alpakka Cassandra documentation.

Without any configuration it will use localhost:9042 as default.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.