Journal plugin
Schema
The following tables need to be created in the configured database:
sourceCREATE TABLE journal (
persistence_id STRING(MAX) NOT NULL,
sequence_nr INT64 NOT NULL,
event BYTES(MAX),
ser_id INT64 NOT NULL,
ser_manifest STRING(MAX) NOT NULL,
write_time TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
writer_uuid STRING(MAX) NOT NULL,
meta BYTES(MAX),
meta_ser_id INT64,
meta_ser_manifest STRING(MAX),
) PRIMARY KEY (persistence_id, sequence_nr)
CREATE TABLE tags (
persistence_id STRING(MAX) NOT NULL,
sequence_nr INT64 NOT NULL,
tag STRING(MAX) NOT NULL,
write_time TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
) PRIMARY KEY (persistence_id, sequence_nr, tag),
INTERLEAVE IN PARENT journal ON DELETE CASCADE
CREATE INDEX tags_tag_and_offset
ON tags (
tag,
write_time
)
CREATE TABLE deletions (
persistence_id STRING(MAX) NOT NULL,
deleted_to INT64 NOT NULL,
) PRIMARY KEY (persistence_id)
Configuration
To activate the journal plugin, add the following line to your Akka application.conf
:
akka.persistence.journal.plugin = "akka.persistence.spanner.journal"
Shared configuration is located under akka.persistence.spanner.journal
.
Configuration just for the journal is under akka.persistence.spanner
. You will need to set at least:
akka.persistence.spanner {
project = <spanner project>
instance = <spanner instance>
database = <spanner database>
}
Authentication details will be picked up from the environment using the standard mechanism described here.
Reference configuration
The following can be overridden in your application.conf
sourceakka.persistence.spanner {
project = "akka"
instance = "akka"
database = "akka"
journal {
class = "akka.persistence.spanner.SpannerJournal"
# name of the table to use for events
table = "journal"
# name of the (interleaved) table used for tags
event-tag-table = "tags"
# name of the auxilary table used to store information about deletes
deletions-table = "deletions"
}
#query
query {
class = "akka.persistence.spanner.SpannerReadJournalProvider"
# When live queries return no results. How often to poll Spanner for new
# rows
refresh-interval = 3s
}
#query
snapshot {
class = "akka.persistence.spanner.SpannerSnapshotStore"
table = "snapshots"
}
#durable-state
durable-state-store {
class = "akka.persistence.spanner.state.SpannerDurableStateStoreProvider"
table = "objects"
tag-column = "tag"
}
# config location for the grpc client.
# located under akka.grpc.client
grpc-client = "spanner-client"
#session-pool
session-pool {
# the max number of sessions to create when connecting to spanner.
# currently all of these sessions are created eagerly and kept alive.
# called 'max-size' as future versions will likely have a dynamic
# session pool
max-size = 20
# the max number of outstanding requests for sessions when the pool is
# fully utilized
max-outstanding-requests = 1000
# How often to retry the batch create sessions on start up and also
# how often to retry of a single session create in the event
# of a NOT_FOUND being returned during keep alive. NOT_FOUND indicates
# that the spanner server has deleted the session.
retry-create-interval = 2s
# If the pool encounters an unexpected error, how long to backoff for before
# re-creating the pool from scratch. For expected errors such as the intitial
# batch session creation set other specific intervals
restart-min-backoff = 1s
restart-max-backoff = 10s
# How often to send a dummy query for each session to keep it alive
# Spanner drops sessions after 60 minutes
keep-alive-interval = 25 minutes
# Internal between each log of stats
# set to off to disable
stats-interval = off
# Stats logger to use, set this logger to DEBUG in your configuration
# to see the stats logging
stats-logger = "akka.persistence.spanner.SessionPool.stats"
# Keep lower than coordinated shutdown stage timeout
# if sessions aren't returned within this timeout then they
# are deleted meaning in progress queries will fail
shutdown-timeout = 3s
}
# set to false for running with the spanner emulator
use-auth = true
# Fail a request if a session cannot be acquired within this timeout
session-acquisition-timeout = 5 s
# If a write fails retry at most this many times
# also time capped within the retry interval. Note that the timeout is in concert with
# the akka persistence circuit breaker call timeout
max-write-retries = 3
max-write-retry-timeout = 10s
}
#session-pool
#grpc
akka.grpc.client.spanner-client {
host = spanner.googleapis.com
port = 443
}
#grpc
# Serialized SpannerOffset is used by Akka Projections
akka {
actor {
serialization-identifiers {
"akka.persistence.spanner.internal.SpannerSerializer" = 45074320
}
serializers {
akka-persistence-spanner = "akka.persistence.spanner.internal.SpannerSerializer"
}
serialization-bindings {
"akka.persistence.spanner.SpannerOffset" = akka-persistence-spanner
}
}
}
Testing with the Spanner emulator
The Spanner Emulator. Set the following to connect to it:
akka.persistence.journal.plugin = "akka.persistence.spanner.journal"
akka.persistence.spanner {
session-pool {
# emulator only supports a single transaction at a time
max-size = 1
}
use-auth = false
}
akka.grpc.client.spanner-client {
host = localhost
port = 9010
use-tls = false
}
You will need to have created the instance and database.
Deletes
The journal supports deletes through hard deletes, which means the journal entries are actually deleted from the database. There is no materialized view with a copy of the event so if an event that is tagged is deleted it will no longer show up in events by tag queries.