Slick (JDBC)

The Slick connector provides Scala and Java DSLs to create a Source to stream the results of a SQL database query and a Flow/Sink to perform SQL actions (like inserts, updates, and deletes) for each element in a stream. It is built on the Slick library to interact with a long list of supported relational databases.

Project Info: Alpakka Slick/JDBC
Artifact
com.lightbend.akka
akka-stream-alpakka-slick
1.1.2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12, 2.13.0
JPMS module nameakka.stream.alpakka.slick
License
Readiness level
Since 0.12, 2017-09-19
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-slick" % "1.1.2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-slick_2.12</artifactId>
  <version>1.1.2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-slick_2.12', version: '1.1.2'
}

You will also need to add the JDBC driver(s) for the specific relational database(s) to your project. Most of those database have drivers that are not available from public repositories so unfortunately some manual steps will probably be required. The Slick documentation has information on where to download the drivers.

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.typesafe.akkaakka-stream_2.122.5.23Apache License, Version 2.0
com.typesafe.slickslick-hikaricp_2.123.3.2Two-clause BSD-style license
com.typesafe.slickslick_2.123.3.2Two-clause BSD-style license
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.typesafe.akka    akka-stream_2.12    2.5.23    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.23    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.23    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.7    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
com.typesafe.slick    slick-hikaricp_2.12    3.3.2    Two-clause BSD-style license
    com.typesafe.slick    slick_2.12    3.3.2    Two-clause BSD-style license
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.reactivestreams    reactive-streams    1.0.2    CC0
        org.scala-lang.modules    scala-collection-compat_2.12    2.0.0    Apache-2.0
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.slf4j    slf4j-api    1.7.25    MIT License
    com.zaxxer    HikariCP    3.2.0    The Apache Software License, Version 2.0
        org.slf4j    slf4j-api    1.7.25    MIT License
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
com.typesafe.slick    slick_2.12    3.3.2    Two-clause BSD-style license
    com.typesafe    config    1.3.3    Apache License, Version 2.0
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang.modules    scala-collection-compat_2.12    2.0.0    Apache-2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.slf4j    slf4j-api    1.7.25    MIT License
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Initialization

As always, before we get started we will need an ActorSystem and a Materializer.

Scala
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
Java
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);

You will also always need the following important imports:

Scala
import akka.stream.alpakka.slick.scaladsl._
import akka.stream.scaladsl._
import slick.jdbc.GetResult
Java
import akka.stream.javadsl.*;
import akka.stream.alpakka.slick.javadsl.*;

The full examples for using the Source, Sink, and Flow (listed further down) also include all required imports.

Starting a Database Session

All functionality provided by this connector requires the user to first create an instance of SlickSession, which is a thin wrapper around Slick’s database connection management and database profile API.

If you are using Slick in your project, you can create a SlickSession instance by sharing the database configuration:

Scala
val databaseConfig = DatabaseConfig.forConfig[JdbcProfile]("slick-h2")
implicit val session = SlickSession.forConfig(databaseConfig)

Otherwise, you can configure your database using typesafe-config by adding a named configuration to your application.conf and then referring to that configuration when starting the session:

Scala
implicit val session = SlickSession.forConfig("slick-h2")
Java
private static final SlickSession session = SlickSession.forConfig("slick-h2");

Here is an example configuration for the H2 database, which is used for the unit tests of the Slick connector itself:

Configuration
# Load using SlickSession.forConfig("slick-h2")
slick-h2 {
  profile = "slick.jdbc.H2Profile$"
  db {
    connectionPool = disabled
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "org.h2.Driver"
      url = "jdbc:h2:/tmp/alpakka-slick-h2-test"
    }
  }
}

You can specify multiple different database configurations, as long as you use unique names. These can then be loaded by fully qualified configuration name using the SlickSession.forConfig() method described above.

The Slick connector supports all the various ways Slick allows you to configure your JDBC database drivers, connection pools, etc., but we strongly recommend using the so-called “DatabaseConfig” method of configuration, which is the only method explicitly tested to work with the Slick connector.

Below are a few configuration examples for other databases. The Slick connector supports all databases supported by Slick (as of Slick 3.2.x)

Postgres
# Load using SlickSession.forConfig("slick-postgres")
slick-postgres {
  profile = "slick.jdbc.PostgresProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "org.postgresql.Driver"
      url = "jdbc:postgresql://127.0.0.1/slickdemo"
      user = slick
      password = ""
    }
  }
}
MySQL
# Load using SlickSession.forConfig("slick-mysql")
slick-mysql {
  profile = "slick.jdbc.MySQLProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "com.mysql.jdbc.Driver"
      url = "jdbc:mysql://localhost:3306/"
      user = slick
      password = ""
    }
  }
}
DB2
# Load using SlickSession.forConfig("slick-db2")
slick-db2 {
  profile = "slick.jdbc.DB2Profile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "com.ibm.db2.jcc.DB2Driver"
      url = "jdbc:db2://localhost:50000/sample"
      user = "db2inst1"
      password = "db2-admin-password"
    }
  }
}
Oracle
# Load using SlickSession.forConfig("slick-oracle")
slick-oracle {
  profile = "slick.jdbc.OracleProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "oracle.jdbc.OracleDriver"
      url = "jdbc:oracle:thin:@//localhost:49161/xe"
      user = slick
      password = ""
    }
  }
}
SQL Server
# Load using SlickSession.forConfig("slick-sqlserver")
slick-sqlserver {
  profile = "slick.jdbc.SQLServerProfile$"
  db {
    dataSourceClass = "slick.jdbc.DriverDataSource"
    properties = {
      driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      url = "jdbc:sqlserver://localhost:1433"
      user = slick
      password = ""
    }
  }
}

Of course these are just examples. Please visit the Slick documentation for DatabaseConfig.fromConfig for the full list of things to configure.

Closing a Database Session

Slick requires you to eventually close your database session to free up connection pool resources. You would usually do this when terminating the ActorSystem, by registering a termination handler like this:

Scala
system.registerOnTermination(() => session.close())
Java
system.registerOnTermination(session::close);

Using a Slick Source

The Slick connector allows you to perform a SQL query and expose the resulting stream of results as an Akka Streams Source[T]. Where T is any type that can be constructed using a database row.

Plain SQL queries

Both the Scala and Java DSLs support the use of plain SQL queries.

The Scala DSL expects you to use the special sql"...", sqlu"...", and sqlt"..." String interpolators provided by Slick to construct queries.

Unfortunately, String interpolation is a Scala language feature that cannot be directly translated to Java. This means that query strings in the Java DSL will need to be manually prepared using plain Java Strings (or a StringBuilder).

The following examples put it all together to perform a simple streaming query.

Scala
implicit val session = SlickSession.forConfig("slick-h2")
system.registerOnTermination(session.close())

// The example domain
case class User(id: Int, name: String)

// We need this to automatically transform result rows
// into instances of the User class.
// Please import slick.jdbc.GetResult
// See also: "http://slick.lightbend.com/doc/3.2.1/sql.html#result-sets"
implicit val getUserResult = GetResult(r => User(r.nextInt, r.nextString))

// This import enables the use of the Slick sql"...",
// sqlu"...", and sqlt"..." String interpolators.
// See also: "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation"
import session.profile.api._

// Stream the results of a query
val done: Future[Done] =
  Slick
    .source(sql"SELECT ID, NAME FROM ALPAKKA_SLICK_SCALADSL_TEST_USERS".as[User])
    .log("user")
    .runWith(Sink.ignore)
Java
final SlickSession session = SlickSession.forConfig("slick-h2");
system.registerOnTermination(session::close);

final CompletionStage<Done> done =
    Slick.source(
            session,
            "SELECT ID, NAME FROM ALPAKKA_SLICK_JAVADSL_TEST_USERS",
            (SlickRow row) -> new User(row.nextInt(), row.nextString()))
        .log("user")
        .runWith(Sink.ignore(), materializer);

Typed Queries

The Scala DSL also supports the use of Slick Scala queries, which are more type-safe then their plain SQL equivalent. The code will look very similar to the plain SQL example.

Scala
implicit val session = SlickSession.forConfig("slick-h2")
system.registerOnTermination(session.close())

// This import brings everything you need into scope
import session.profile.api._

// The example domain
class Users(tag: Tag) extends Table[(Int, String)](tag, "ALPAKKA_SLICK_SCALADSL_TEST_USERS") {
  def id = column[Int]("ID")
  def name = column[String]("NAME")
  def * = (id, name)
}

// Stream the results of a query
val done: Future[Done] =
  Slick
    .source(TableQuery[Users].result)
    .log("user")
    .runWith(Sink.ignore)

Using a Slick Flow or Sink

If you want to take stream of elements and turn them into side-effecting actions in a relational database, the Slick connector allows you to perform any DML or DDL statement using either a Sink or a Flow. This includes the typical insert/update/delete statements but also create table, drop table, etc. The unit tests have a couple of good examples of the latter usage.

The following example show the use of a Slick Sink to take a stream of elements and insert them into the database. There is an optional parallelism argument to specify how many concurrent streams will be sent to the database. The unit tests for the slick connector have example of performing parallel inserts.

Scala
implicit val session = SlickSession.forConfig("slick-h2")
system.registerOnTermination(session.close())

// The example domain
case class User(id: Int, name: String)
val users = (1 to 42).map(i => User(i, s"Name$i"))

// This import enables the use of the Slick sql"...",
// sqlu"...", and sqlt"..." String interpolators.
// See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation"
import session.profile.api._

// Stream the users into the database as insert statements
val done: Future[Done] =
  Source(users)
    .runWith(
      // add an optional first argument to specify the parallelism factor (Int)
      Slick.sink(user => sqlu"INSERT INTO ALPAKKA_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})")
    )
Java
final CompletionStage<Done> done =
    Source.from(users)
        .runWith(
            Slick.<User>sink(
                session,
                // add an optional second argument to specify the parallelism factor (int)
                (user) ->
                    "INSERT INTO ALPAKKA_SLICK_JAVADSL_TEST_USERS VALUES ("
                        + user.id
                        + ", '"
                        + user.name
                        + "')"),
            materializer);

Flow

The Slick connector also exposes a Flow that has the exact same functionality as the Sink but it allows you to continue the stream for further processing. The return value of every executed statement, e.g. the element values is the fixed type Int denoting the number of updated/inserted/deleted rows.

Scala
implicit val session = SlickSession.forConfig("slick-h2")
system.registerOnTermination(session.close())

// The example domain
case class User(id: Int, name: String)
val users = (1 to 42).map(i => User(i, s"Name$i"))

// This import enables the use of the Slick sql"...",
// sqlu"...", and sqlt"..." String interpolators.
// See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation"
import session.profile.api._

// Stream the users into the database as insert statements
val done: Future[Done] =
  Source(users)
    .via(
      // add an optional first argument to specify the parallelism factor (Int)
      Slick.flow(user => sqlu"INSERT INTO ALPAKKA_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})")
    )
    .log("nr-of-updated-rows")
    .runWith(Sink.ignore)
Java
final SlickSession session = SlickSession.forConfig("slick-h2");
system.registerOnTermination(session::close);

final List<User> users =
    IntStream.range(0, 42)
        .boxed()
        .map((i) -> new User(i, "Name" + i))
        .collect(Collectors.toList());

int parallelism = 1;

final CompletionStage<Done> done =
    Source.from(users)
        .via(
            Slick.<User>flow(
                session,
                parallelism,
                (user) ->
                    "INSERT INTO ALPAKKA_SLICK_JAVADSL_TEST_USERS VALUES ("
                        + user.id
                        + ", '"
                        + user.name
                        + "')"))
        .log("nr-of-updated-rows")
        .runWith(Sink.ignore(), materializer);

Flow with pass-through

To have a different return type, use the flowWithPassThrough function. E.g. when consuming Kafka messages, this allows you to maintain the kafka committable offset so the message can be committed in a next stage in the flow.

Scala
implicit val session = SlickSession.forConfig("slick-h2")
system.registerOnTermination(session.close())

// The example domain
case class User(id: Int, name: String)
val users = (1 to 42).map(i => User(i, s"Name$i"))
val messagesFromKafka = users.zipWithIndex.map { case (user, index) => KafkaMessage(user, CommittableOffset(index)) }

// This import enables the use of the Slick sql"...",
// sqlu"...", and sqlt"..." String interpolators.
// See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation"
import session.profile.api._

// Stream the users into the database as insert statements
val done: Future[Done] =
  Source(messagesFromKafka)
    .via(
      // add an optional first argument to specify the parallelism factor (Int)
      Slick.flowWithPassThrough { kafkaMessage =>
        val user = kafkaMessage.msg
        (sqlu"INSERT INTO ALPAKKA_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})")
          .map { insertCount => // map db result to something else
            // allows to keep the kafka message offset so it can be committed in a next stage
            kafkaMessage.map(user => (user, insertCount))
          }
      }
    )
    .log("nr-of-updated-rows")
    .mapAsync(1) { // in correct order
      kafkaMessage =>
        kafkaMessage.offset.commit // commit kafka messages
    }
    .runWith(Sink.ignore)
Java
final CompletionStage<Done> done =
    Source.from(messagesFromKafka)
        .via(
            Slick.flowWithPassThrough(
                session,
                system.dispatcher(),
                // add an optional second argument to specify the parallelism factor (int)
                (kafkaMessage) ->
                    "INSERT INTO ALPAKKA_SLICK_JAVADSL_TEST_USERS VALUES ("
                        + kafkaMessage.msg.id
                        + ", '"
                        + kafkaMessage.msg.name
                        + "')",
                (kafkaMessage, insertCount) ->
                    kafkaMessage.map(
                        user ->
                            Pair.create(
                                user,
                                insertCount)) // allows to keep the kafka message offset so it
                // can be committed in a next stage
                ))
        .log("nr-of-updated-rows")
        .mapAsync(
            1,
            kafkaMessage ->
                kafkaMessage.offset.commit()) // in correct order, commit Kafka message
        .runWith(Sink.ignore(), materializer);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.