Apache Cassandra
Apache Cassandra is a free and open-source, distributed, wide column store, NoSQL database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. Cassandra offers robust support for clusters spanning multiple datacenters, with asynchronous masterless replication allowing low latency operations for all clients.
Alpakka Cassandra offers an Akka Streams API on top of a CqlSession
from the Datastax Java Driver version 4.0+. The driver configuration is provided in the same config format as Akka uses and can be placed in the same application.conf
as your Akka settings.
Project Info: Alpakka Cassandra | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-cassandra
7.0.2
|
JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 |
Scala versions | 2.13.12, 3.3.1 |
JPMS module name | akka.stream.alpakka.cassandra |
License | |
Readiness level |
Since 2.0.0, 2020-04-30
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
Additionally, add the dependencies as below.
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "7.0.2"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-cassandra_${scala.binary.version}</artifactId> <version>7.0.2</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-cassandra_${versions.ScalaBinary}:7.0.2" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.datastax.oss java-driver-core 4.17.0 com.fasterxml.jackson.core jackson-core 2.15.2 com.fasterxml.jackson.core jackson-databind 2.15.2 com.typesafe.akka akka-stream_2.13 2.9.0 org.scala-lang scala-library 2.13.12 - Dependency tree
com.datastax.oss java-driver-core 4.17.0 com.datastax.oss java-driver-shaded-guava 25.1-jre-graal-sub-1 Apache 2 com.datastax.oss native-protocol 1.5.1 Apache 2 com.fasterxml.jackson.core jackson-core 2.15.2 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-databind 2.15.2 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.15.2 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.15.2 The Apache Software License, Version 2.0 com.github.jnr jnr-posix 3.1.15 Eclipse Public License - v 2.0 com.github.jnr jnr-constants 0.10.3 The Apache Software License, Version 2.0 com.github.jnr jnr-ffi 2.2.11 The Apache Software License, Version 2.0 com.github.jnr jffi 1.3.9 The Apache Software License, Version 2.0 com.github.jnr jnr-a64asm 1.0.0 The Apache Software License, Version 2.0 com.github.jnr jnr-x86asm 1.0.2 MIT License org.ow2.asm asm-analysis 9.2 BSD-3-Clause org.ow2.asm asm-tree 9.2 BSD-3-Clause org.ow2.asm asm 9.2 BSD-3-Clause org.ow2.asm asm-commons 9.2 BSD-3-Clause org.ow2.asm asm-analysis 9.2 BSD-3-Clause org.ow2.asm asm-tree 9.2 BSD-3-Clause org.ow2.asm asm 9.2 BSD-3-Clause org.ow2.asm asm-tree 9.2 BSD-3-Clause org.ow2.asm asm 9.2 BSD-3-Clause org.ow2.asm asm 9.2 BSD-3-Clause org.ow2.asm asm-tree 9.2 BSD-3-Clause org.ow2.asm asm 9.2 BSD-3-Clause org.ow2.asm asm-util 9.2 BSD-3-Clause org.ow2.asm asm-analysis 9.2 BSD-3-Clause org.ow2.asm asm-tree 9.2 BSD-3-Clause org.ow2.asm asm 9.2 BSD-3-Clause org.ow2.asm asm-tree 9.2 BSD-3-Clause org.ow2.asm asm 9.2 BSD-3-Clause org.ow2.asm asm 9.2 BSD-3-Clause org.ow2.asm asm 9.2 BSD-3-Clause com.github.stephenc.jcip jcip-annotations 1.0-1 Apache License, Version 2.0 com.typesafe config 1.4.3 Apache-2.0 io.dropwizard.metrics metrics-core 4.1.18 org.slf4j slf4j-api 1.7.30 io.netty netty-handler 4.1.94.Final io.netty netty-buffer 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-codec 4.1.94.Final io.netty netty-buffer 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-transport 4.1.94.Final io.netty netty-buffer 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-resolver 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-resolver 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-transport-native-unix-common 4.1.94.Final io.netty netty-buffer 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-transport 4.1.94.Final io.netty netty-buffer 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-resolver 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-transport 4.1.94.Final io.netty netty-buffer 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-common 4.1.94.Final io.netty netty-resolver 4.1.94.Final io.netty netty-common 4.1.94.Final org.hdrhistogram HdrHistogram 2.1.12 Public Domain, per Creative Commons CC0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.slf4j slf4j-api 1.7.30 com.fasterxml.jackson.core jackson-core 2.15.2 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-databind 2.15.2 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.15.2 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.15.2 The Apache Software License, Version 2.0 com.typesafe.akka akka-stream_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.9.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.2 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.9.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0
Sessions
Cassandra is accessed through CassandraSession
CassandraSession
s which are managed by the CassandraSessionRegistry
CassandraSessionRegistry
Akka extension. This way a session is shared across all usages within the actor system and properly shut down after the actor system is shut down.
The CassandraSession
is provided to the stream factory methods as an implicit
parameter.
- Scala
-
source
import akka.stream.alpakka.cassandra.CassandraSessionSettings import akka.stream.alpakka.cassandra.scaladsl.CassandraSession import akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry val system: ActorSystem = // ??? val sessionSettings = CassandraSessionSettings() implicit val cassandraSession: CassandraSession = CassandraSessionRegistry.get(system).sessionFor(sessionSettings) val version: Future[String] = cassandraSession .select("SELECT release_version FROM system.local;") .map(_.getString("release_version")) .runWith(Sink.head)
- Java
-
source
import akka.actor.ActorSystem; import akka.stream.Materializer; import akka.stream.alpakka.cassandra.CassandraSessionSettings; import akka.stream.alpakka.cassandra.javadsl.CassandraSession; import akka.stream.alpakka.cassandra.javadsl.CassandraSessionRegistry; ActorSystem system = // ??? CassandraSessionSettings sessionSettings = CassandraSessionSettings.create(); CassandraSession cassandraSession = CassandraSessionRegistry.get(system).sessionFor(sessionSettings); CompletionStage<String> version = cassandraSession .select("SELECT release_version FROM system.local;") .map(row -> row.getString("release_version")) .runWith(Sink.head(), system);
See custom session creation below for tweaking this.
Reading from Cassandra
CassandraSource
CassandraSource
provides factory methods to get Akka Streams Sources from CQL queries and from com.datastax.oss.driver.api.core.cql.Statement
s.
Dynamic parameters can be provided to the CQL as variable arguments.
- Scala
-
source
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource val ids: Future[immutable.Seq[Int]] = CassandraSource(s"SELECT id FROM $intTable").map(row => row.getInt("id")).runWith(Sink.seq) val idsWhere: Future[Int] = CassandraSource(s"SELECT * FROM $intTable WHERE id = ?", value).map(_.getInt("id")).runWith(Sink.head)
- Java
-
source
import akka.stream.alpakka.cassandra.javadsl.CassandraSource; CompletionStage<List<Integer>> select = CassandraSource.create(cassandraSession, "SELECT id FROM " + idtable + ";") .map(r -> r.getInt("id")) .runWith(Sink.seq(), system); CompletionStage<Integer> select = CassandraSource.create( cassandraSession, "SELECT * FROM " + idtable + " WHERE id = ?;", value) .map(r -> r.getInt("id")) .runWith(Sink.head(), system);
If the statement requires specific settings, you may pass any com.datastax.oss.driver.api.core.cql.Statement
.
- Scala
-
source
import com.datastax.oss.driver.api.core.cql.{Row, SimpleStatement} val stmt = SimpleStatement.newInstance(s"SELECT * FROM $intTable").setPageSize(20) val rows: Future[immutable.Seq[Row]] = CassandraSource(stmt).runWith(Sink.seq)
- Java
-
source
import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.cql.Statement; Statement<?> stmt = SimpleStatement.newInstance("SELECT * FROM " + idtable + ";").setPageSize(20); CompletionStage<List<Integer>> select = CassandraSource.create(cassandraSession, stmt) .map(r -> r.getInt("id")) .runWith(Sink.seq(), system);
Here we used a basic sink to complete the stream by collecting all of the stream elements into a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.
Writing to Cassandra
CassandraFlow
CassandraFlow
provides factory methods to get Akka Streams flows to run CQL statements that change data (UPDATE
, INSERT
). Alpakka Cassandra creates a PreparedStatement
and for every stream element the statementBinder
function binds the CQL placeholders to data.
The incoming elements are emitted unchanged for further processing.
- Scala
-
source
import akka.stream.alpakka.cassandra.CassandraWriteSettings import akka.stream.alpakka.cassandra.scaladsl.CassandraFlow import com.datastax.oss.driver.api.core.cql.{BoundStatement, PreparedStatement} case class Person(id: Int, name: String, city: String) val persons = immutable.Seq(Person(12, "John", "London"), Person(43, "Umberto", "Roma"), Person(56, "James", "Chicago")) val statementBinder: (Person, PreparedStatement) => BoundStatement = (person, preparedStatement) => preparedStatement.bind(Int.box(person.id), person.name, person.city) val written: Future[immutable.Seq[Person]] = Source(persons) .via( CassandraFlow.create(CassandraWriteSettings.defaults, s"INSERT INTO $table(id, name, city) VALUES (?, ?, ?)", statementBinder) ) .runWith(Sink.seq)
- Java
-
source
import akka.NotUsed; import akka.actor.ActorSystem; import akka.japi.Function2; import akka.japi.Pair; import akka.stream.alpakka.cassandra.CassandraWriteSettings; import akka.stream.alpakka.cassandra.javadsl.CassandraFlow; import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4; import akka.stream.javadsl.SourceWithContext; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement; List<Person> persons = Arrays.asList( new Person(12, "John", "London"), new Person(43, "Umberto", "Roma"), new Person(56, "James", "Chicago")); Function2<Person, PreparedStatement, BoundStatement> statementBinder = (person, preparedStatement) -> preparedStatement.bind(person.id, person.name, person.city); CompletionStage<List<Person>> written = Source.from(persons) .via( CassandraFlow.create( cassandraSession, CassandraWriteSettings.defaults(), "INSERT INTO " + table + "(id, name, city) VALUES (?, ?, ?)", statementBinder)) .runWith(Sink.seq(), system);
Update flows with context
Alpakka Cassandra flows offer “With Context”-support which integrates nicely with some other Alpakka connectors.
- Scala
-
source
val personsAndHandles: SourceWithContext[Person, AckHandle, NotUsed] = // ??? val written: Future[Done] = personsAndHandles .via( CassandraFlow.withContext( CassandraWriteSettings.defaults, s"INSERT INTO $table(id, name, city) VALUES (?, ?, ?)", (person, preparedStatement) => preparedStatement.bind(Int.box(person.id), person.name, person.city) ) ) .asSource .mapAsync(1) { case (_, handle) => handle.ack() } .runWith(Sink.ignore)
- Java
-
source
SourceWithContext<Person, AckHandle, NotUsed> from = // ???; CompletionStage<Done> written = from.via( CassandraFlow.withContext( cassandraSession, CassandraWriteSettings.defaults(), "INSERT INTO " + table + "(id, name, city) VALUES (?, ?, ?)", (person, preparedStatement) -> preparedStatement.bind(person.id, person.name, person.city))) .asSource() .mapAsync(1, pair -> pair.second().ack()) .runWith(Sink.ignore(), system);
Custom Session creation
Session creation and configuration is controlled via settings in application.conf
. The CassandraSessionSettings
CassandraSessionSettings
accept a full path to a configuration section which needs to specify a session-provider
setting. The CassandraSessionRegistry
CassandraSessionRegistry
expects a fully qualified class name to a class implementing CqlSessionProvider
CqlSessionProvider
.
Alpakka Cassandra includes a default implementation DefaultSessionProvider
DefaultSessionProvider
, which is referenced in the default configuration alpakka.cassandra
.
The DefaultSessionProvider
DefaultSessionProvider
config section must contain:
- a settings section
service-discovery
which may be used to discover Cassandra contact points via Akka Discovery, - and a reference to a Cassandra config section in
datastax-java-driver-config
which is used to configure the Cassandra client. For details see the Datastax Java Driver configuration and the driver’sreference.conf
.
- reference.conf
-
source
alpakka.cassandra { # The implementation of `akka.stream.alpakka.cassandra.CqlSessionProvider` # used for creating the `CqlSession`. # It may optionally have a constructor with an `ClassicActorSystemProvider` and `Config` parameters. session-provider = "akka.stream.alpakka.cassandra.DefaultSessionProvider" # Configure Akka Discovery by setting a service name service-discovery { name = "" lookup-timeout = 1 s } # The ExecutionContext to use for the session tasks and future composition. session-dispatcher = "akka.actor.default-dispatcher" # Full config path to the Datastax Java driver's configuration section. # When connecting to more than one Cassandra cluster different session configuration can be # defined with this property. # See https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/#quick-overview # and https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/ datastax-java-driver-config = "datastax-java-driver" }
In simple cases your datastax-java-driver
section will need to define contact-points
and load-balancing-policy.local-datacenter
. To make the Cassandra driver retry its initial connection attempts, add advanced.reconnect-on-init = true
.
- application.conf
-
source
datastax-java-driver { basic { contact-points = [ "127.0.0.1:9042" ] load-balancing-policy.local-datacenter = datacenter1 } advanced.reconnect-on-init = true }
Using Akka Discovery
To use Akka Discovery make sure the akka-discovery
dependency is on you classpath.
- sbt
val AkkaVersion = "2.9.0" libraryDependencies += "com.typesafe.akka" %% "akka-discovery" % AkkaVersion
- Maven
<properties> <akka.version>2.9.0</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-discovery_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ AkkaVersion: "2.9.0", ScalaBinary: "2.13" ] dependencies { implementation "com.typesafe.akka:akka-discovery_${versions.ScalaBinary}:${versions.AkkaVersion}" }
To enable Akka Discovery with the DefaultSessionProvider
DefaultSessionProvider
, set up the desired service name in the discovery mechanism of your choice and pass that name in service-discovery.name
. The example below extends the alpakka.cassandra
config section and only overwrites the service name.
- application.conf
-
source
akka { discovery.method = config } akka.discovery.config.services = { cassandra-service = { endpoints = [ { host = "127.0.0.1" port = 9042 } ] } } // inherit defaults from `alpakka.cassandra` settings example-with-akka-discovery: ${alpakka.cassandra} { service-discovery.name = "cassandra-service" }
Use the full config section path to create the CassandraSessionSettings
CassandraSessionSettings
.
- Scala
-
source
val sessionSettings = CassandraSessionSettings("example-with-akka-discovery") implicit val session: CassandraSession = CassandraSessionRegistry.get(system).sessionFor(sessionSettings)
- Java
-
source
CassandraSessionSettings sessionSettings = CassandraSessionSettings.create("example-with-akka-discovery"); CassandraSession session = CassandraSessionRegistry.get(system).sessionFor(sessionSettings);