Apache Cassandra

Cassandra

Apache Cassandra is a free and open-source, distributed, wide column store, NoSQL database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. Cassandra offers robust support for clusters spanning multiple datacenters, with asynchronous masterless replication allowing low latency operations for all clients.

Wikipedia

Alpakka Cassandra offers an Akka Streams API on top of a CqlSession from the Datastax Java Driver version 4.0+. The driver configuration is provided in the same config format as Akka uses and can be placed in the same application.conf as your Akka settings.

Project Info: Alpakka Cassandra
Artifact
com.lightbend.akka
akka-stream-alpakka-cassandra
2.0.0-RC1
JDK versions
Adopt OpenJDK 8
Adopt OpenJDK 11
Scala versions2.12.10, 2.13.1
JPMS module nameakka.stream.alpakka.cassandra
License
Readiness level
Since 2.0.0-RC1, 2020-03-10
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "2.0.0-RC1"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-cassandra_2.12</artifactId>
  <version>2.0.0-RC1</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-cassandra_2.12', version: '2.0.0-RC1'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.datastax.ossjava-driver-core4.5.0
com.typesafe.akkaakka-stream_2.122.5.30
io.nettynetty-all4.1.39.Final
io.nettynetty-handler4.1.45.Final
org.scala-langscala-library2.12.10
Dependency tree
com.datastax.oss    java-driver-core    4.5.0
    com.datastax.oss    java-driver-shaded-guava    25.1-jre
    com.datastax.oss    native-protocol    1.4.9
    com.esri.geometry    esri-geometry-api    1.2.1
        org.codehaus.jackson    jackson-core-asl    1.9.12
        org.json    json    20090211
    com.fasterxml.jackson.core    jackson-core    2.10.0
    com.fasterxml.jackson.core    jackson-databind    2.10.0
        com.fasterxml.jackson.core    jackson-annotations    2.10.0
        com.fasterxml.jackson.core    jackson-core    2.10.0
    com.github.jnr    jnr-ffi    2.1.10
        com.github.jnr    jffi    1.2.19
        com.github.jnr    jnr-a64asm    1.0.0
        com.github.jnr    jnr-x86asm    1.0.2
        org.ow2.asm    asm-analysis    7.1
            org.ow2.asm    asm-tree    7.1
                org.ow2.asm    asm    7.1
        org.ow2.asm    asm-commons    7.1
            org.ow2.asm    asm-analysis    7.1
                org.ow2.asm    asm-tree    7.1
                    org.ow2.asm    asm    7.1
            org.ow2.asm    asm-tree    7.1
                org.ow2.asm    asm    7.1
            org.ow2.asm    asm    7.1
        org.ow2.asm    asm-tree    7.1
            org.ow2.asm    asm    7.1
        org.ow2.asm    asm-util    7.1
            org.ow2.asm    asm-analysis    7.1
                org.ow2.asm    asm-tree    7.1
                    org.ow2.asm    asm    7.1
            org.ow2.asm    asm-tree    7.1
                org.ow2.asm    asm    7.1
            org.ow2.asm    asm    7.1
        org.ow2.asm    asm    7.1
    com.github.jnr    jnr-posix    3.0.50
        com.github.jnr    jnr-constants    0.9.12
        com.github.jnr    jnr-ffi    2.1.10
            com.github.jnr    jffi    1.2.19
            com.github.jnr    jnr-a64asm    1.0.0
            com.github.jnr    jnr-x86asm    1.0.2
            org.ow2.asm    asm-analysis    7.1
                org.ow2.asm    asm-tree    7.1
                    org.ow2.asm    asm    7.1
            org.ow2.asm    asm-commons    7.1
                org.ow2.asm    asm-analysis    7.1
                    org.ow2.asm    asm-tree    7.1
                        org.ow2.asm    asm    7.1
                org.ow2.asm    asm-tree    7.1
                    org.ow2.asm    asm    7.1
                org.ow2.asm    asm    7.1
            org.ow2.asm    asm-tree    7.1
                org.ow2.asm    asm    7.1
            org.ow2.asm    asm-util    7.1
                org.ow2.asm    asm-analysis    7.1
                    org.ow2.asm    asm-tree    7.1
                        org.ow2.asm    asm    7.1
                org.ow2.asm    asm-tree    7.1
                    org.ow2.asm    asm    7.1
                org.ow2.asm    asm    7.1
            org.ow2.asm    asm    7.1
    com.github.stephenc.jcip    jcip-annotations    1.0-1
    com.typesafe    config    1.3.4
    io.dropwizard.metrics    metrics-core    4.0.5
        org.slf4j    slf4j-api    1.7.26
    org.apache.tinkerpop    gremlin-driver    3.4.5
        org.apache.commons    commons-lang3    3.8.1
        org.codehaus.groovy    groovy-json    2.5.7
        org.codehaus.groovy    groovy    2.5.7
    org.apache.tinkerpop    tinkergraph-gremlin    3.4.5
        org.apache.commons    commons-lang3    3.8.1
    org.hdrhistogram    HdrHistogram    2.1.11
    org.javatuples    javatuples    1.2
    org.reactivestreams    reactive-streams    1.0.2
    org.slf4j    slf4j-api    1.7.26
com.typesafe.akka    akka-stream_2.12    2.5.30
    com.typesafe.akka    akka-actor_2.12    2.5.30
        com.typesafe    config    1.3.4
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0
    com.typesafe.akka    akka-protobuf_2.12    2.5.30
    com.typesafe    ssl-config-core_2.12    0.3.8
        com.typesafe    config    1.3.4
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.2
    org.reactivestreams    reactive-streams    1.0.2
io.netty    netty-all    4.1.39.Final
io.netty    netty-handler    4.1.45.Final
    io.netty    netty-buffer    4.1.45.Final
        io.netty    netty-common    4.1.45.Final
    io.netty    netty-codec    4.1.45.Final
        io.netty    netty-buffer    4.1.45.Final
            io.netty    netty-common    4.1.45.Final
        io.netty    netty-common    4.1.45.Final
        io.netty    netty-transport    4.1.45.Final
            io.netty    netty-buffer    4.1.45.Final
                io.netty    netty-common    4.1.45.Final
            io.netty    netty-common    4.1.45.Final
            io.netty    netty-resolver    4.1.45.Final
                io.netty    netty-common    4.1.45.Final
    io.netty    netty-common    4.1.45.Final
    io.netty    netty-transport    4.1.45.Final
        io.netty    netty-buffer    4.1.45.Final
            io.netty    netty-common    4.1.45.Final
        io.netty    netty-common    4.1.45.Final
        io.netty    netty-resolver    4.1.45.Final
            io.netty    netty-common    4.1.45.Final
org.scala-lang    scala-library    2.12.10

Sessions

Cassandra is accessed through CassandraSessionCassandraSessions which are managed by the CassandraSessionRegistryCassandraSessionRegistry Akka extension. This way a session is shared across all usages within the actor system and properly shut down after the actor system is shut down.

The CassandraSession is provided to the stream factory methods as an implicit parameter.

Scala
import akka.stream.alpakka.cassandra.CassandraSessionSettings
import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
import akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry

val system: ActorSystem = // ???
val sessionSettings = CassandraSessionSettings()
implicit val cassandraSession: CassandraSession =
  CassandraSessionRegistry.get(system).sessionFor(sessionSettings)

val version: Future[String] =
  cassandraSession
    .select("SELECT release_version FROM system.local;")
    .map(_.getString("release_version"))
    .runWith(Sink.head)
Java

import akka.actor.ActorSystem; import akka.stream.Materializer; import akka.stream.alpakka.cassandra.CassandraSessionSettings; import akka.stream.alpakka.cassandra.javadsl.CassandraSession; import akka.stream.alpakka.cassandra.javadsl.CassandraSessionRegistry; ActorSystem system = // ??? Materializer materializer = // ??? CassandraSessionSettings sessionSettings = CassandraSessionSettings.create(); CassandraSession cassandraSession = CassandraSessionRegistry.get(system).sessionFor(sessionSettings); CompletionStage<String> version = cassandraSession .select("SELECT release_version FROM system.local;") .map(row -> row.getString("release_version")) .runWith(Sink.head(), materializer);

See custom session creation below for tweaking this.

Reading from Cassandra

CassandraSourceCassandraSource provides factory methods to get Akka Streams Sources from CQL queries and from com.datastax.oss.driver.api.core.cql.Statements.

Dynamic parameters can be provided to the CQL as variable arguments.

Scala
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource

val ids: Future[immutable.Seq[Int]] =
  CassandraSource(s"SELECT id FROM $intTable").map(row => row.getInt("id")).runWith(Sink.seq)

val idsWhere: Future[Int] =
  CassandraSource(s"SELECT * FROM $intTable WHERE id = ?", value).map(_.getInt("id")).runWith(Sink.head)
Java
import akka.stream.alpakka.cassandra.javadsl.CassandraSource;

CompletionStage<List<Integer>> select =
    CassandraSource.create(cassandraSession, "SELECT id FROM " + idtable + ";")
        .map(r -> r.getInt("id"))
        .runWith(Sink.seq(), materializer);

CompletionStage<Integer> select =
    CassandraSource.create(
            cassandraSession, "SELECT * FROM " + idtable + " WHERE id = ?;", value)
        .map(r -> r.getInt("id"))
        .runWith(Sink.head(), materializer);

If the statement requires specific settings, you may pass any com.datastax.oss.driver.api.core.cql.Statement.

Scala
import com.datastax.oss.driver.api.core.cql.{Row, SimpleStatement}

val stmt = SimpleStatement.newInstance(s"SELECT * FROM $intTable").setPageSize(20)

val rows: Future[immutable.Seq[Row]] = CassandraSource(stmt).runWith(Sink.seq)
Java
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;

Statement<?> stmt =
    SimpleStatement.newInstance("SELECT * FROM " + idtable + ";").setPageSize(20);

CompletionStage<List<Integer>> select =
    CassandraSource.create(cassandraSession, stmt)
        .map(r -> r.getInt("id"))
        .runWith(Sink.seq(), materializer);

Here we used a basic sink to complete the stream by collecting all of the stream elements into a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.

Writing to Cassandra

CassandraFlowCassandraFlow provides factory methods to get Akka Streams flows to run CQL statements that change data (UPDATE, INSERT). Alpakka Cassandra creates a PreparedStatement and for every stream element the statementBinder function binds the CQL placeholders to data.

The incoming elements are emitted unchanged for further processing.

Scala
import akka.stream.alpakka.cassandra.CassandraWriteSettings
import akka.stream.alpakka.cassandra.scaladsl.CassandraFlow
import com.datastax.oss.driver.api.core.cql.{BoundStatement, PreparedStatement}

case class Person(id: Int, name: String, city: String)

val persons =
  immutable.Seq(Person(12, "John", "London"), Person(43, "Umberto", "Roma"), Person(56, "James", "Chicago"))

val statementBinder: (Person, PreparedStatement) => BoundStatement =
  (person, preparedStatement) => preparedStatement.bind(Int.box(person.id), person.name, person.city)

val written: Future[immutable.Seq[Person]] = Source(persons)
  .via(
    CassandraFlow.create(CassandraWriteSettings.defaults,
                         s"INSERT INTO $table(id, name, city) VALUES (?, ?, ?)",
                         statementBinder)
  )
  .runWith(Sink.seq)
Java
import akka.NotUsed;
import akka.japi.Function2;
import akka.japi.Pair;
import akka.stream.Materializer;
import akka.stream.alpakka.cassandra.CassandraWriteSettings;
import akka.stream.alpakka.cassandra.javadsl.CassandraFlow;
import akka.stream.javadsl.SourceWithContext;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;

List<Person> persons =
    Arrays.asList(
        new Person(12, "John", "London"),
        new Person(43, "Umberto", "Roma"),
        new Person(56, "James", "Chicago"));

Function2<Person, PreparedStatement, BoundStatement> statementBinder =
    (person, preparedStatement) -> preparedStatement.bind(person.id, person.name, person.city);

CompletionStage<List<Person>> written =
    Source.from(persons)
        .via(
            CassandraFlow.create(
                cassandraSession,
                CassandraWriteSettings.defaults(),
                "INSERT INTO " + table + "(id, name, city) VALUES (?, ?, ?)",
                statementBinder))
        .runWith(Sink.seq(), materializer);

Update flows with context

Alpakka Cassandra flows offer “With Context”-support which integrates nicely with some other Alpakka connectors.

Scala
val personsAndHandles: SourceWithContext[Person, AckHandle, NotUsed] = // ???

val written: Future[Done] = personsAndHandles
  .via(
    CassandraFlow.withContext(
      CassandraWriteSettings.defaults,
      s"INSERT INTO $table(id, name, city) VALUES (?, ?, ?)",
      (person, preparedStatement) => preparedStatement.bind(Int.box(person.id), person.name, person.city)
    )
  )
  .asSource
  .mapAsync(1) {
    case (_, handle) => handle.ack()
  }
  .runWith(Sink.ignore)
Java
SourceWithContext<Person, AckHandle, NotUsed> from = // ???;
CompletionStage<Done> written =
    from.via(
            CassandraFlow.withContext(
                cassandraSession,
                CassandraWriteSettings.defaults(),
                "INSERT INTO " + table + "(id, name, city) VALUES (?, ?, ?)",
                (person, preparedStatement) ->
                    preparedStatement.bind(person.id, person.name, person.city)))
        .asSource()
        .mapAsync(1, pair -> pair.second().ack())
        .runWith(Sink.ignore(), materializer);

Custom Session creation

Session creation and configuration is controlled via settings in application.conf. The CassandraSessionSettingsCassandraSessionSettings accept a full path to a configuration section which needs to specify a session-provider setting. The CassandraSessionRegistryCassandraSessionRegistry expects a fully qualified class name to a class implementing CqlSessionProviderCqlSessionProvider.

Alpakka Cassandra includes a default implementation DefaultSessionProviderDefaultSessionProvider, which is referenced in the default configuration alpakka.cassandra.

The DefaultSessionProviderDefaultSessionProvider config section must contain:

  • a settings section service-discovery which may be used to discover Cassandra contact points via Akka Discovery,
  • and a reference to a Cassandra config section in datastax-java-driver-config which is used to configure the Cassandra client. For details see the Datastax Java Driver configuration and the driver’s reference.conf.
reference.conf
alpakka.cassandra {
  # The implementation of `akka.stream.alpakka.cassandra.CqlSessionProvider`
  # used for creating the `CqlSession`.
  # It may optionally have a constructor with an `ActorSystem` and `Config` parameters.
  session-provider = "akka.stream.alpakka.cassandra.DefaultSessionProvider"

  # Configure Akka Discovery by setting a service name
  service-discovery {
    name = ""
    lookup-timeout = 1 s
  }

  # The ExecutionContext to use for the session tasks and future composition.
  session-dispatcher = "akka.actor.default-dispatcher"

  # Full config path to the Datastax Java driver's configuration section.
  # When connecting to more than one Cassandra cluster different session configuration can be
  # defined with this property.
  # See https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/#quick-overview
  # and https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/
  datastax-java-driver-config = "datastax-java-driver"
}

In simple cases your datastax-java-driver section will need to define contact-points and load-balancing-policy.local-datacenter. To make the Cassandra driver retry its initial connection attempts, add advanced.reconnect-on-init = true.

application.conf
datastax-java-driver {
  basic {
    contact-points = [ "127.0.0.1:9042" ]
    load-balancing-policy.local-datacenter = datacenter1
  }
  advanced.reconnect-on-init = true
}

Using Akka Discovery

To use Akka Discovery make sure the akka-discovery dependency is on you classpath.

sbt
val AkkaVersion = "2.5.30"
libraryDependencies += "com.typesafe.akka" %% "akka-discovery" % AkkaVersion
Maven
<properties>
  <akka.version>2.5.30</akka.version>
</properties>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-discovery_2.12</artifactId>
  <version>${akka.version}</version>
</dependency>
Gradle
versions += [
  AkkaVersion: "2.5.30"
]
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-discovery_2.12', version: versions.AkkaVersion
}

To enable Akka Discovery with the DefaultSessionProviderDefaultSessionProvider, set up the desired service name in the discovery mechanism of your choice and pass that name in service-discovery.name. The example below extends the alpakka.cassandra config section and only overwrites the service name.

application.conf
akka {
  discovery.method = config
}
akka.discovery.config.services = {
  cassandra-service = {
    endpoints = [
      {
        host = "127.0.0.1"
        port = 9042
      }
    ]
  }
}

// inherit defaults from `alpakka.cassandra` settings
example-with-akka-discovery: ${alpakka.cassandra} {
  service-discovery.name = "cassandra-service"
}

Use the full config section path to create the CassandraSessionSettingsCassandraSessionSettings.

Scala
val sessionSettings = CassandraSessionSettings("example-with-akka-discovery")
implicit val session = CassandraSessionRegistry.get(system).sessionFor(sessionSettings)
Java
CassandraSessionSettings sessionSettings =
    CassandraSessionSettings.create("example-with-akka-discovery");
CassandraSession session = CassandraSessionRegistry.get(system).sessionFor(sessionSettings);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.