Journal plugin

Schema

The following tables need to be created in the configured database:

CREATE TABLE journal (
  persistence_id STRING(MAX) NOT NULL,
  sequence_nr INT64 NOT NULL,
  event BYTES(MAX),
  ser_id INT64 NOT NULL,
  ser_manifest STRING(MAX) NOT NULL,
  write_time TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
  writer_uuid STRING(MAX) NOT NULL,
  meta BYTES(MAX),
  meta_ser_id INT64,
  meta_ser_manifest STRING(MAX),
) PRIMARY KEY (persistence_id, sequence_nr)

CREATE TABLE tags (
  persistence_id STRING(MAX) NOT NULL,
  sequence_nr INT64 NOT NULL,
  tag STRING(MAX) NOT NULL,
  write_time TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
) PRIMARY KEY (persistence_id, sequence_nr, tag),
INTERLEAVE IN PARENT journal ON DELETE CASCADE

CREATE INDEX tags_tag_and_offset
ON tags (
  tag,
  write_time
)

CREATE TABLE deletions (
  persistence_id STRING(MAX) NOT NULL,
  deleted_to INT64 NOT NULL,
) PRIMARY KEY (persistence_id)

Configuration

To activate the journal plugin, add the following line to your Akka application.conf:

akka.persistence.journal.plugin = "akka.persistence.spanner.journal"

Shared configuration is located under akka.persistence.spanner.journal.

Configuration just for the journal is under akka.persistence.spanner. You will need to set at least:

akka.persistence.spanner {
    project = <spanner project>
    instance = <spanner instance>
    database = <spanner database>
}

Authentication details will be picked up from the environment using the standard mechanism described here.

Reference configuration

The following can be overridden in your application.conf

akka.persistence.spanner {

  project = "akka"
  instance = "akka"
  database = "akka"

  journal {
    class = "akka.persistence.spanner.SpannerJournal"

    # name of the table to use for events
    table = "journal"

    # name of the (interleaved) table used for tags
    event-tag-table = "tags"

    # name of the auxilary table used to store information about deletes
    deletions-table = "deletions"
  }

  #query
  query {
    class = "akka.persistence.spanner.SpannerReadJournalProvider"

    # When live queries return no results. How often to poll Spanner for new
    # rows
    refresh-interval = 3s
  }
  #query

  snapshot {
    class = "akka.persistence.spanner.SpannerSnapshotStore"
    table = "snapshots"
  }



  # config location for the grpc client.
  # located under akka.grpc.client
  grpc-client = "spanner-client"


  #session-pool
  session-pool {
    # the max number of sessions to create when connecting to spanner.
    # currently all of these sessions are created eagerly and kept alive.
    # called 'max-size' as future versions will likely have a dynamic
    # session pool
    max-size = 5

    # the max number of outstanding requests for sessions when the pool is
    # fully utilized
    max-outstanding-requests = 1000

    # How often to retry the batch create sessions on start up and also
    # how often to retry of a single session create in the event
    # of a NOT_FOUND being returned during keep alive. NOT_FOUND indicates
    # that the spanner server has deleted the session.
    retry-create-interval = 2s

    # If the pool encounters an unexpected error, how long to backoff for before
    # re-creating the pool from scratch. For expected errors such as the intitial
    # batch session creation set other specific intervals
    restart-min-backoff = 1s
    restart-max-backoff = 10s

    # How often to send a dummy query for each session to keep it alive
    # Spanner drops sessions after 60 minutes
    keep-alive-interval = 25 minutes

    # Internal between each log of stats
    # set to off to disable
    stats-interval = off
    # Stats logger to use, set this logger to DEBUG in your configuration
    # to see the stats logging
    stats-logger = "akka.persistence.spanner.SessionPool.stats"

    # Keep lower than coordinated shutdown stage timeout
    # if sessions aren't returned within this timeout then they
    # are deleted meaning in progress queries will fail
    shutdown-timeout = 3s
  }

  # set to false for running with the spanner emulator
  use-auth = true

  # Fail a request if a session cannot be acquired within this timeout
  session-acquisition-timeout = 5 s

  # If a write fails retry at most this many times
  # also time capped within the retry interval. Note that the timeout is in concert with
  # the akka persistence circuit breaker call timeout
  max-write-retries = 3
  max-write-retry-timeout = 10s

}
#session-pool

#grpc
akka.grpc.client.spanner-client {
  host = spanner.googleapis.com
  port = 443
}
#grpc

# Serialized SpannerOffset is used by Akka Projections
akka {
  actor {
    serialization-identifiers {
      "akka.persistence.spanner.internal.SpannerSerializer" = 45074320
    }
    serializers {
      akka-persistence-spanner = "akka.persistence.spanner.internal.SpannerSerializer"
    }
    serialization-bindings {
      "akka.persistence.spanner.SpannerOffset" = akka-persistence-spanner
    }
  }
}

Testing with the Spanner emulator

The Spanner Emulator. Set the following to connect to it:

akka.persistence.journal.plugin = "akka.persistence.spanner.journal"
  akka.persistence.spanner {
    session-pool {
      # emulator only supports a single transaction at a time
      max-size = 1
    }
    
    use-auth = false
 }
akka.grpc.client.spanner-client {
  host = localhost
  port = 9010
  use-tls = false
}

You will need to have created the instance and database.

Deletes

The journal supports deletes through hard deletes, which means the journal entries are actually deleted from the database. There is no materialized view with a copy of the event so if an event that is tagged is deleted it will no longer show up in events by tag queries.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.