Slick (JDBC)
The Slick connector provides Scala and Java DSLs to create a Source
to stream the results of a SQL database query and a Flow
/Sink
to perform SQL actions (like inserts, updates, and deletes) for each element in a stream. It is built on the Slick library to interact with a long list of supported relational databases.
Project Info: Alpakka Slick/JDBC | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-slick
1.1.2
|
JDK versions | OpenJDK 8 |
Scala versions | 2.12.7, 2.11.12, 2.13.0 |
JPMS module name | akka.stream.alpakka.slick |
License | |
Readiness level |
Since 0.12, 2017-09-19
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-slick" % "1.1.2"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-slick_2.12</artifactId> <version>1.1.2</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-slick_2.12', version: '1.1.2' }
You will also need to add the JDBC driver(s) for the specific relational database(s) to your project. Most of those database have drivers that are not available from public repositories so unfortunately some manual steps will probably be required. The Slick documentation has information on where to download the drivers.
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version License com.typesafe.akka akka-stream_2.12 2.5.23 Apache License, Version 2.0 com.typesafe.slick slick-hikaricp_2.12 3.3.2 Two-clause BSD-style license com.typesafe.slick slick_2.12 3.3.2 Two-clause BSD-style license org.scala-lang scala-library 2.12.7 BSD 3-Clause - Dependency tree
com.typesafe.akka akka-stream_2.12 2.5.23 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.23 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-protobuf_2.12 2.5.23 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.slick slick-hikaricp_2.12 3.3.2 Two-clause BSD-style license com.typesafe.slick slick_2.12 3.3.2 Two-clause BSD-style license com.typesafe config 1.3.3 Apache License, Version 2.0 org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang.modules scala-collection-compat_2.12 2.0.0 Apache-2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.slf4j slf4j-api 1.7.25 MIT License com.zaxxer HikariCP 3.2.0 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.slick slick_2.12 3.3.2 Two-clause BSD-style license com.typesafe config 1.3.3 Apache License, Version 2.0 org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang.modules scala-collection-compat_2.12 2.0.0 Apache-2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.slf4j slf4j-api 1.7.25 MIT License org.scala-lang scala-library 2.12.7 BSD 3-Clause
Initialization
As always, before we get started we will need an ActorSystem
and a Materializer
.
- Scala
-
implicit val system = ActorSystem() implicit val mat = ActorMaterializer()
- Java
-
system = ActorSystem.create(); materializer = ActorMaterializer.create(system);
You will also always need the following important imports:
- Scala
-
import akka.stream.alpakka.slick.scaladsl._ import akka.stream.scaladsl._ import slick.jdbc.GetResult
- Java
-
import akka.stream.javadsl.*; import akka.stream.alpakka.slick.javadsl.*;
The full examples for using the Source
, Sink
, and Flow
(listed further down) also include all required imports.
Starting a Database Session
All functionality provided by this connector requires the user to first create an instance of SlickSession
, which is a thin wrapper around Slick’s database connection management and database profile API.
If you are using Slick in your project, you can create a SlickSession
instance by sharing the database configuration:
- Scala
-
val databaseConfig = DatabaseConfig.forConfig[JdbcProfile]("slick-h2") implicit val session = SlickSession.forConfig(databaseConfig)
Otherwise, you can configure your database using typesafe-config by adding a named configuration to your application.conf and then referring to that configuration when starting the session:
- Scala
-
implicit val session = SlickSession.forConfig("slick-h2")
- Java
-
private static final SlickSession session = SlickSession.forConfig("slick-h2");
Here is an example configuration for the H2 database, which is used for the unit tests of the Slick connector itself:
- Configuration
-
# Load using SlickSession.forConfig("slick-h2") slick-h2 { profile = "slick.jdbc.H2Profile$" db { connectionPool = disabled dataSourceClass = "slick.jdbc.DriverDataSource" properties = { driver = "org.h2.Driver" url = "jdbc:h2:/tmp/alpakka-slick-h2-test" } } }
You can specify multiple different database configurations, as long as you use unique names. These can then be loaded by fully qualified configuration name using the SlickSession.forConfig()
method described above.
The Slick connector supports all the various ways Slick allows you to configure your JDBC database drivers, connection pools, etc., but we strongly recommend using the so-called “DatabaseConfig” method of configuration, which is the only method explicitly tested to work with the Slick connector.
Below are a few configuration examples for other databases. The Slick connector supports all databases supported by Slick (as of Slick 3.2.x)
- Postgres
-
# Load using SlickSession.forConfig("slick-postgres") slick-postgres { profile = "slick.jdbc.PostgresProfile$" db { dataSourceClass = "slick.jdbc.DriverDataSource" properties = { driver = "org.postgresql.Driver" url = "jdbc:postgresql://127.0.0.1/slickdemo" user = slick password = "" } } }
- MySQL
-
# Load using SlickSession.forConfig("slick-mysql") slick-mysql { profile = "slick.jdbc.MySQLProfile$" db { dataSourceClass = "slick.jdbc.DriverDataSource" properties = { driver = "com.mysql.jdbc.Driver" url = "jdbc:mysql://localhost:3306/" user = slick password = "" } } }
- DB2
-
# Load using SlickSession.forConfig("slick-db2") slick-db2 { profile = "slick.jdbc.DB2Profile$" db { dataSourceClass = "slick.jdbc.DriverDataSource" properties = { driver = "com.ibm.db2.jcc.DB2Driver" url = "jdbc:db2://localhost:50000/sample" user = "db2inst1" password = "db2-admin-password" } } }
- Oracle
-
# Load using SlickSession.forConfig("slick-oracle") slick-oracle { profile = "slick.jdbc.OracleProfile$" db { dataSourceClass = "slick.jdbc.DriverDataSource" properties = { driver = "oracle.jdbc.OracleDriver" url = "jdbc:oracle:thin:@//localhost:49161/xe" user = slick password = "" } } }
- SQL Server
-
# Load using SlickSession.forConfig("slick-sqlserver") slick-sqlserver { profile = "slick.jdbc.SQLServerProfile$" db { dataSourceClass = "slick.jdbc.DriverDataSource" properties = { driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" url = "jdbc:sqlserver://localhost:1433" user = slick password = "" } } }
Of course these are just examples. Please visit the Slick documentation for DatabaseConfig.fromConfig
for the full list of things to configure.
Closing a Database Session
Slick requires you to eventually close your database session to free up connection pool resources. You would usually do this when terminating the ActorSystem
, by registering a termination handler like this:
- Scala
-
system.registerOnTermination(() => session.close())
- Java
-
system.registerOnTermination(session::close);
Using a Slick Source
The Slick connector allows you to perform a SQL query and expose the resulting stream of results as an Akka Streams Source[T]
. Where T
is any type that can be constructed using a database row.
Plain SQL queries
Both the Scala and Java DSLs support the use of plain SQL queries.
The Scala DSL expects you to use the special sql"..."
, sqlu"..."
, and sqlt"..."
String interpolators provided by Slick to construct queries.
Unfortunately, String interpolation is a Scala language feature that cannot be directly translated to Java. This means that query strings in the Java DSL will need to be manually prepared using plain Java Strings (or a StringBuilder
).
The following examples put it all together to perform a simple streaming query.
- Scala
-
implicit val session = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // The example domain case class User(id: Int, name: String) // We need this to automatically transform result rows // into instances of the User class. // Please import slick.jdbc.GetResult // See also: "http://slick.lightbend.com/doc/3.2.1/sql.html#result-sets" implicit val getUserResult = GetResult(r => User(r.nextInt, r.nextString)) // This import enables the use of the Slick sql"...", // sqlu"...", and sqlt"..." String interpolators. // See also: "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation" import session.profile.api._ // Stream the results of a query val done: Future[Done] = Slick .source(sql"SELECT ID, NAME FROM ALPAKKA_SLICK_SCALADSL_TEST_USERS".as[User]) .log("user") .runWith(Sink.ignore)
- Java
-
final SlickSession session = SlickSession.forConfig("slick-h2"); system.registerOnTermination(session::close); final CompletionStage<Done> done = Slick.source( session, "SELECT ID, NAME FROM ALPAKKA_SLICK_JAVADSL_TEST_USERS", (SlickRow row) -> new User(row.nextInt(), row.nextString())) .log("user") .runWith(Sink.ignore(), materializer);
Typed Queries
The Scala DSL also supports the use of Slick Scala queries, which are more type-safe then their plain SQL equivalent. The code will look very similar to the plain SQL example.
- Scala
-
implicit val session = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // This import brings everything you need into scope import session.profile.api._ // The example domain class Users(tag: Tag) extends Table[(Int, String)](tag, "ALPAKKA_SLICK_SCALADSL_TEST_USERS") { def id = column[Int]("ID") def name = column[String]("NAME") def * = (id, name) } // Stream the results of a query val done: Future[Done] = Slick .source(TableQuery[Users].result) .log("user") .runWith(Sink.ignore)
Using a Slick Flow or Sink
If you want to take stream of elements and turn them into side-effecting actions in a relational database, the Slick connector allows you to perform any DML or DDL statement using either a Sink
or a Flow
. This includes the typical insert
/update
/delete
statements but also create table
, drop table
, etc. The unit tests have a couple of good examples of the latter usage.
The following example show the use of a Slick Sink
to take a stream of elements and insert them into the database. There is an optional parallelism
argument to specify how many concurrent streams will be sent to the database. The unit tests for the slick connector have example of performing parallel inserts.
- Scala
-
implicit val session = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // The example domain case class User(id: Int, name: String) val users = (1 to 42).map(i => User(i, s"Name$i")) // This import enables the use of the Slick sql"...", // sqlu"...", and sqlt"..." String interpolators. // See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation" import session.profile.api._ // Stream the users into the database as insert statements val done: Future[Done] = Source(users) .runWith( // add an optional first argument to specify the parallelism factor (Int) Slick.sink(user => sqlu"INSERT INTO ALPAKKA_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})") )
- Java
-
final CompletionStage<Done> done = Source.from(users) .runWith( Slick.<User>sink( session, // add an optional second argument to specify the parallelism factor (int) (user) -> "INSERT INTO ALPAKKA_SLICK_JAVADSL_TEST_USERS VALUES (" + user.id + ", '" + user.name + "')"), materializer);
Flow
The Slick connector also exposes a Flow
that has the exact same functionality as the Sink
but it allows you to continue the stream for further processing. The return value of every executed statement, e.g. the element values is the fixed type Int
denoting the number of updated/inserted/deleted rows.
- Scala
-
implicit val session = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // The example domain case class User(id: Int, name: String) val users = (1 to 42).map(i => User(i, s"Name$i")) // This import enables the use of the Slick sql"...", // sqlu"...", and sqlt"..." String interpolators. // See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation" import session.profile.api._ // Stream the users into the database as insert statements val done: Future[Done] = Source(users) .via( // add an optional first argument to specify the parallelism factor (Int) Slick.flow(user => sqlu"INSERT INTO ALPAKKA_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})") ) .log("nr-of-updated-rows") .runWith(Sink.ignore)
- Java
-
final SlickSession session = SlickSession.forConfig("slick-h2"); system.registerOnTermination(session::close); final List<User> users = IntStream.range(0, 42) .boxed() .map((i) -> new User(i, "Name" + i)) .collect(Collectors.toList()); int parallelism = 1; final CompletionStage<Done> done = Source.from(users) .via( Slick.<User>flow( session, parallelism, (user) -> "INSERT INTO ALPAKKA_SLICK_JAVADSL_TEST_USERS VALUES (" + user.id + ", '" + user.name + "')")) .log("nr-of-updated-rows") .runWith(Sink.ignore(), materializer);
Flow with pass-through
To have a different return type, use the flowWithPassThrough
function. E.g. when consuming Kafka messages, this allows you to maintain the kafka committable offset so the message can be committed in a next stage in the flow.
- Scala
-
implicit val session = SlickSession.forConfig("slick-h2") system.registerOnTermination(session.close()) // The example domain case class User(id: Int, name: String) val users = (1 to 42).map(i => User(i, s"Name$i")) val messagesFromKafka = users.zipWithIndex.map { case (user, index) => KafkaMessage(user, CommittableOffset(index)) } // This import enables the use of the Slick sql"...", // sqlu"...", and sqlt"..." String interpolators. // See "http://slick.lightbend.com/doc/3.2.1/sql.html#string-interpolation" import session.profile.api._ // Stream the users into the database as insert statements val done: Future[Done] = Source(messagesFromKafka) .via( // add an optional first argument to specify the parallelism factor (Int) Slick.flowWithPassThrough { kafkaMessage => val user = kafkaMessage.msg (sqlu"INSERT INTO ALPAKKA_SLICK_SCALADSL_TEST_USERS VALUES(${user.id}, ${user.name})") .map { insertCount => // map db result to something else // allows to keep the kafka message offset so it can be committed in a next stage kafkaMessage.map(user => (user, insertCount)) } } ) .log("nr-of-updated-rows") .mapAsync(1) { // in correct order kafkaMessage => kafkaMessage.offset.commit // commit kafka messages } .runWith(Sink.ignore)
- Java
-
final CompletionStage<Done> done = Source.from(messagesFromKafka) .via( Slick.flowWithPassThrough( session, system.dispatcher(), // add an optional second argument to specify the parallelism factor (int) (kafkaMessage) -> "INSERT INTO ALPAKKA_SLICK_JAVADSL_TEST_USERS VALUES (" + kafkaMessage.msg.id + ", '" + kafkaMessage.msg.name + "')", (kafkaMessage, insertCount) -> kafkaMessage.map( user -> Pair.create( user, insertCount)) // allows to keep the kafka message offset so it // can be committed in a next stage )) .log("nr-of-updated-rows") .mapAsync( 1, kafkaMessage -> kafkaMessage.offset.commit()) // in correct order, commit Kafka message .runWith(Sink.ignore(), materializer);