Journal

Features

  • All operations required by the Akka Persistence journal plugin API are fully supported.
  • The plugin uses Cassandra in a pure log-oriented way i.e. data is only ever inserted but never updated (deletions are made on user request only).
  • Writes of messages are batched to optimize throughput for persistAsync. See batch writes for details how to configure batch sizes. The plugin was tested to work properly under high load.
  • Messages written by a single persistent actor are partitioned across the cluster to achieve scalability with data volume by adding nodes.
  • Persistence Query support by CassandraReadJournal

Configuration

To activate the journal plugin, add the following line to your Akka application.conf:

akka.persistence.journal.plugin = "cassandra-journal"

This will run the journal with its default settings. The default settings can be changed with the configuration properties defined in reference.conf:

Caveats

  • Detailed tests under failure conditions are still missing.
  • Range deletion performance (i.e. deleteMessages up to a specified sequence number) depends on the extend of previous deletions
    • linearly increases with the number of tombstones generated by previous permanent deletions and drops to a minimum after compaction
  • For versions prior to 0.80 events by tag uses Cassandra Materialized Views which are a new feature that has yet to stabilise Use at your own risk, see here for a discussion on the Cassandra dev mailing list. Version 0.80 and on migrated away from Materialized Views and maintain a separate table for events by tag queries.

These issues are likely to be resolved in future versions of the plugin.

Default keyspace and table definitions

CREATE KEYSPACE IF NOT EXISTS akka
WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 };

CREATE TABLE IF NOT EXISTS akka.messages (
  used boolean static,
  persistence_id text,
  partition_nr bigint,
  sequence_nr bigint,
  timestamp timeuuid,
  timebucket text,
  writer_uuid text,
  ser_id int,
  ser_manifest text,
  event_manifest text,
  event blob,
  meta_ser_id int,
  meta_ser_manifest text,
  meta blob,
  message blob,
  tags set<text>,
  PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp, timebucket))
  WITH gc_grace_seconds =864000
  AND compaction = {
    'class' : 'SizeTieredCompactionStrategy',
    'enabled' : true,
    'tombstone_compaction_interval' : 86400,
    'tombstone_threshold' : 0.2,
    'unchecked_tombstone_compaction' : false,
    'bucket_high' : 1.5,
    'bucket_low' : 0.5,
    'max_threshold' : 32,
    'min_threshold' : 4,
    'min_sstable_size' : 50
    };

CREATE TABLE IF NOT EXISTS akka.tag_views (
  tag_name text,
  persistence_id text,
  sequence_nr bigint,
  timebucket bigint,
  timestamp timeuuid,
  tag_pid_sequence_nr bigint,
  writer_uuid text,
  ser_id int,
  ser_manifest text,
  event_manifest text,
  event blob,
  meta_ser_id int,
  meta_ser_manifest text,
  meta blob,
  PRIMARY KEY ((tag_name, timebucket), timestamp, persistence_id, tag_pid_sequence_nr))
  WITH gc_grace_seconds =864000
  AND compaction = {
    'class' : 'SizeTieredCompactionStrategy',
    'enabled' : true,
    'tombstone_compaction_interval' : 86400,
    'tombstone_threshold' : 0.2,
    'unchecked_tombstone_compaction' : false,
    'bucket_high' : 1.5,
    'bucket_low' : 0.5,
    'max_threshold' : 32,
    'min_threshold' : 4,
    'min_sstable_size' : 50
    };

CREATE TABLE IF NOT EXISTS akka.tag_write_progress(
  persistence_id text,
  tag text,
  sequence_nr bigint,
  tag_pid_sequence_nr bigint,
  offset timeuuid,
  PRIMARY KEY (persistence_id, tag));

CREATE TABLE IF NOT EXISTS akka.tag_scanning(
  persistence_id text,
  sequence_nr bigint,
  PRIMARY KEY (persistence_id));

CREATE TABLE IF NOT EXISTS akka.metadata(
  persistence_id text PRIMARY KEY,
  deleted_to bigint,
  properties map<text,text>);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.