Apache Cassandra

The Cassandra connector allows you to read and write to Cassandra. You can query a stream of rows from CassandraSource or use prepared statements to insert or update with CassandraFlow or CassandraSink.

Unlogged batches are also supported.

Project Info: Alpakka Cassandra
Artifact
com.lightbend.akka
akka-stream-alpakka-cassandra
1.0-M2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12
JPMS module nameakka.stream.alpakka.cassandra
License
Readiness level
Community-driven
Since 0.3, 2016-12-02
Home pagehttps://doc.akka.io/docs/alpakka/current/
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.0-M2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-cassandra_2.12</artifactId>
  <version>1.0-M2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-cassandra_2.12', version: '1.0-M2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.datastax.cassandracassandra-driver-core3.5.1Apache 2
com.typesafe.akkaakka-stream_2.122.5.19Apache License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.datastax.cassandra    cassandra-driver-core    3.5.1    Apache 2
    com.github.jnr    jnr-ffi    2.1.7    The Apache Software License, Version 2.0
        com.github.jnr    jffi    1.2.16    The Apache Software License, Version 2.0
        com.github.jnr    jnr-x86asm    1.0.2    MIT License
        org.ow2.asm    asm-analysis    5.0.3    BSD
            org.ow2.asm    asm-tree    5.0.3    BSD
                org.ow2.asm    asm    5.0.3    BSD
        org.ow2.asm    asm-commons    5.0.3    BSD
            org.ow2.asm    asm-tree    5.0.3    BSD
                org.ow2.asm    asm    5.0.3    BSD
        org.ow2.asm    asm-tree    5.0.3    BSD
            org.ow2.asm    asm    5.0.3    BSD
        org.ow2.asm    asm-util    5.0.3    BSD
            org.ow2.asm    asm-tree    5.0.3    BSD
                org.ow2.asm    asm    5.0.3    BSD
        org.ow2.asm    asm    5.0.3    BSD
    com.github.jnr    jnr-posix    3.0.44    Common Public License - v 1.0
        com.github.jnr    jnr-constants    0.9.9    The Apache Software License, Version 2.0
        com.github.jnr    jnr-ffi    2.1.7    The Apache Software License, Version 2.0
            com.github.jnr    jffi    1.2.16    The Apache Software License, Version 2.0
            com.github.jnr    jnr-x86asm    1.0.2    MIT License
            org.ow2.asm    asm-analysis    5.0.3    BSD
                org.ow2.asm    asm-tree    5.0.3    BSD
                    org.ow2.asm    asm    5.0.3    BSD
            org.ow2.asm    asm-commons    5.0.3    BSD
                org.ow2.asm    asm-tree    5.0.3    BSD
                    org.ow2.asm    asm    5.0.3    BSD
            org.ow2.asm    asm-tree    5.0.3    BSD
                org.ow2.asm    asm    5.0.3    BSD
            org.ow2.asm    asm-util    5.0.3    BSD
                org.ow2.asm    asm-tree    5.0.3    BSD
                    org.ow2.asm    asm    5.0.3    BSD
            org.ow2.asm    asm    5.0.3    BSD
    com.google.guava    guava    19.0    The Apache Software License, Version 2.0
    io.dropwizard.metrics    metrics-core    3.2.2    Apache License 2.0
        org.slf4j    slf4j-api    1.7.25    MIT License
    io.netty    netty-handler    4.0.56.Final    Apache License, Version 2.0
        io.netty    netty-buffer    4.0.56.Final    Apache License, Version 2.0
            io.netty    netty-common    4.0.56.Final    Apache License, Version 2.0
        io.netty    netty-codec    4.0.56.Final    Apache License, Version 2.0
            io.netty    netty-transport    4.0.56.Final    Apache License, Version 2.0
                io.netty    netty-buffer    4.0.56.Final    Apache License, Version 2.0
                    io.netty    netty-common    4.0.56.Final    Apache License, Version 2.0
        io.netty    netty-transport    4.0.56.Final    Apache License, Version 2.0
            io.netty    netty-buffer    4.0.56.Final    Apache License, Version 2.0
                io.netty    netty-common    4.0.56.Final    Apache License, Version 2.0
    org.slf4j    slf4j-api    1.7.25    MIT License
com.typesafe.akka    akka-stream_2.12    2.5.19    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.19    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.19    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.6    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Source

Sources provided by this connector need a prepared session to communicate with Cassandra cluster. First, let’s initialize a Cassandra session.

Scala
implicit val session = Cluster.builder
  .addContactPoint("127.0.0.1")
  .withPort(9042)
  .build
  .connect()
Java
final Session session =
    Cluster.builder().addContactPoint("127.0.0.1").withPort(9042).build().connect();

We will also need an ActorSystem and an ActorMaterializer.

Scala
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
Java
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);

Let’s create a Cassandra statement with a query that we want to execute.

Scala
val stmt = new SimpleStatement(s"SELECT * FROM $keyspaceName.test").setFetchSize(20)
Java
final Statement stmt =
    new SimpleStatement("SELECT * FROM akka_stream_java_test.test").setFetchSize(20);

And finally create the source using any method from the CassandraSource factory and run it.

Scala
val rows = CassandraSource(stmt).runWith(Sink.seq)
Java
final CompletionStage<List<Row>> rows =
    CassandraSource.create(stmt, session).runWith(Sink.seq(), materializer);

Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.

Flow with passthrough

Let’s create a Cassandra Prepared statement with a query that we want to execute.

Scala
val preparedStatement = session.prepare(s"INSERT INTO $keyspaceName.test(id) VALUES (?)")
Java
final PreparedStatement preparedStatement =
    session.prepare("insert into akka_stream_java_test.test (id) values (?)");

Now we need to create a ‘statement binder’, this is just a function to bind to the prepared statement. It can take in any type / data structure to fit your query values. Here we’re just using one Integer, but it can just as easily be a (case) class.

Scala
val statementBinder = (myInteger: Integer, statement: PreparedStatement) => statement.bind(myInteger)
Java
BiFunction<Integer, PreparedStatement, BoundStatement> statementBinder =
    (myInteger, statement) -> statement.bind(myInteger);

We run the stream persisting the elements to C* and finally folding them using a Sink.fold.

Scala
val flow = CassandraFlow.createWithPassThrough[Integer](parallelism = 2, preparedStatement, statementBinder)

val result = source.via(flow).runWith(Sink.seq)
Java
final Flow<Integer, Integer, NotUsed> flow =
    CassandraFlow.createWithPassThrough(2, preparedStatement, statementBinder, session);

CompletionStage<List<Integer>> result = source.via(flow).runWith(Sink.seq(), materializer);

Flow with passthrough and unlogged batching

Use this when most of the elements in the stream share the same partition key.

Cassandra unlogged batches that share the same partition key will only resolve to one write internally in Cassandra, boosting write performance.

Be aware that this stage does not preserve the upstream order!

For this example we will define a class that model the data to be inserted

Scala
case class ToInsert(id: Integer, cc: Integer)
Java
private class ToInsert {
  Integer id;
  Integer cc;

  public ToInsert(Integer id, Integer cc) {
    this.id = id;
    this.cc = cc;
  }
}

Let’s create a Cassandra Prepared statement with a query that we want to execute.

Scala
val preparedStatement = session.prepare(s"INSERT INTO $keyspaceName.test_batch(id, cc) VALUES (?, ?)")
Java
final PreparedStatement preparedStatement =
    session.prepare("insert into akka_stream_java_test.test_batch(id, cc) values (?, ?)");

Now we need to create a ‘statement binder’, this is just a function to bind to the prepared statement. In this example we are using a class.

Scala
val statementBinder =
  (elemToInsert: ToInsert, statement: PreparedStatement) => statement.bind(elemToInsert.id, elemToInsert.cc)
Java
BiFunction<ToInsert, PreparedStatement, BoundStatement> statementBinder =
    (toInsert, statement) -> statement.bind(toInsert.id, toInsert.cc);

You can define the amount of grouped elements, in this case we will use the default ones:

Scala
val settings: CassandraBatchSettings = CassandraBatchSettings()
Java
CassandraBatchSettings defaultSettings = CassandraBatchSettings.create();

We run the stream persisting the elements to C* and finally folding them using a Sink.fold. The function T => K has to extract the Cassandra partition key from your class.

Scala
val flow = CassandraFlow.createUnloggedBatchWithPassThrough[ToInsert, Integer](parallelism = 2,
                                                                               preparedStatement,
                                                                               statementBinder,
                                                                               ti => ti.id,
                                                                               settings)

val result = source.via(flow).runWith(Sink.seq)
Java
final Flow<ToInsert, ToInsert, NotUsed> flow =
    CassandraFlow.createUnloggedBatchWithPassThrough(
        2, preparedStatement, statementBinder, (ti) -> ti.id, defaultSettings, session);

CompletionStage<List<ToInsert>> result = source.via(flow).runWith(Sink.seq(), materializer);

Sink

Let’s create a Cassandra Prepared statement with a query that we want to execute.

Scala
val preparedStatement = session.prepare(s"INSERT INTO $keyspaceName.test(id) VALUES (?)")
Java
final PreparedStatement preparedStatement =
    session.prepare("insert into akka_stream_java_test.test (id) values (?)");

Now we need to create a ‘statement binder’, this is just a function to bind to the prepared statement. It can take in any type / data structure to fit your query values. Here we’re just using one Integer, but it can just as easily be a (case) class.

Scala
val statementBinder = (myInteger: Integer, statement: PreparedStatement) => statement.bind(myInteger)
Java
BiFunction<Integer, PreparedStatement, BoundStatement> statementBinder =
    (myInteger, statement) -> statement.bind(myInteger);

Finally we run the sink from any source.

Scala
val sink = CassandraSink[Integer](parallelism = 2, preparedStatement, statementBinder)

val result = source.runWith(sink)
Java
final Sink<Integer, CompletionStage<Done>> sink =
    CassandraSink.create(2, preparedStatement, statementBinder, session);

CompletionStage<Done> result = source.runWith(sink, materializer);

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Test code requires Cassandra running in the background. You can start it quickly using docker:

docker-compose up cassandra

Scala
sbt
> cassandra/testOnly *.CassandraSourceSpec
Java
sbt
> cassandra/testOnly *.CassandraSourceTest
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.