MongoDB

The MongoDB connector allows you to read and save documents. You can query as a stream of documents from MongoSource or update documents in a collection with MongoSink.

This connector is based off the mongo-scala-driver and does not have a java interface. It supports driver macros and codec allowing to read or write scala case class objects.

Alternative connector

Another MongoDB connector is available.

ReactiveMongo is a Scala driver that provides fully non-blocking and asynchronous I/O operations.

Please read more about it in the ReactiveMongo documentation.

Project Info: Alpakka MongoDB
Artifact
com.lightbend.akka
akka-stream-alpakka-mongodb
1.0-M2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12
JPMS module nameakka.stream.alpakka.mongodb
License
Readiness level
Community-driven
Since 0.15, 2017-12-06
Home pagehttps://doc.akka.io/docs/alpakka/current/
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.0-M2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-mongodb_2.12</artifactId>
  <version>1.0-M2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-mongodb_2.12', version: '1.0-M2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.typesafe.akkaakka-stream_2.122.5.19Apache License, Version 2.0
org.mongodb.scalamongo-scala-driver_2.122.4.2Apache 2
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.typesafe.akka    akka-stream_2.12    2.5.19    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.19    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.19    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.6    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.mongodb.scala    mongo-scala-driver_2.12    2.4.2    Apache 2
    org.mongodb.scala    mongo-scala-bson_2.12    2.4.2    Apache 2
        org.mongodb    mongodb-driver-async    3.8.2    The Apache Software License, Version 2.0
            org.mongodb    bson    3.8.2    The Apache Software License, Version 2.0
            org.mongodb    mongodb-driver-core    3.8.2    The Apache Software License, Version 2.0
                org.mongodb    bson    3.8.2    The Apache Software License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-reflect    2.12.7    BSD 3-Clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.mongodb    mongodb-driver-async    3.8.2    The Apache Software License, Version 2.0
        org.mongodb    bson    3.8.2    The Apache Software License, Version 2.0
        org.mongodb    mongodb-driver-core    3.8.2    The Apache Software License, Version 2.0
            org.mongodb    bson    3.8.2    The Apache Software License, Version 2.0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.scala-lang    scala-reflect    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Initialization

Sources provided by this connector need a prepared session to communicate with MongoDB server.

For codec and macros support, you first need to provide a case class and a codecRegistry.

Scala
case class Number(_id: Int)
val codecRegistry = fromRegistries(fromProviders(classOf[Number]), DEFAULT_CODEC_REGISTRY)

Then, lets initialize a MongoDB connection.

Scala
private val client = MongoClient(s"mongodb://localhost:27017")
private val db = client.getDatabase("alpakka-mongo")
private val numbersColl = db.getCollection("numbers")

For codec support, add the registry to the database or the collection.

Scala
private val numbersObjectColl = db.getCollection("numbers").withCodecRegistry(codecRegistry)

We will also need an ActorSystem and an ActorMaterializer.

Scala
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()

This is all preparation that we are going to need.

Source

Let’s create a source from a MongoDB collection observable, which can optionally take a filter.

Scala
val source: Source[Document, NotUsed] =
  MongoSource(numbersColl.find())

With codec support, adapt the type of the source.

Scala
val source: Source[Number, NotUsed] =
  MongoSource[Number](numbersObjectColl.find())

And finally we can run it.

Scala
val rows: Future[Seq[Document]] = source.runWith(Sink.seq)

With codec support

Scala
val rows: Future[Seq[Number]] = source.runWith(Sink.seq)

Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.

Flow and Sink

Each of these sink factory methods have a corresponding factory in insertOne which will emit the written document or result of the operation downstream.

For codec support, the type must be specified in the database or collection declaration.

Scala
private val numbersObjectColl: MongoCollection[Number] = db.getCollection("numbersSink")

Insert

We can use a Source of documents to save them to a mongo collection using insertOne or insertMany.

Scala
val source: Source[Document, NotUsed] = ???
source.runWith(MongoSink.insertOne(parallelism = 2, collection = numbersColl))

With codec support

Scala
val source: Source[Number, NotUsed] = ???
source.runWith(MongoSink.insertOne[Number](parallelism = 2, collection = numbersObjectColl))

Insert Many

Insert many can be used if you have a collection of documents to insert at once.

Scala
val source: Source[Seq[Document], NotUsed] = ???
source.runWith(MongoSink.insertMany(parallelism = 2, collection = numbersColl))

With codec support

Scala
val source: Source[Seq[Number], NotUsed] = ???
source.runWith(MongoSink.insertMany[Number](parallelism = 2, collection = numbersObjectColl))

Update

We can update documents with a Source of DocumentUpdate which is a filter and a update definition. Use either updateOne or updateMany if the filter should target one or many documents.

Scala
import org.mongodb.scala.model.{Filters, Updates}

val source: Source[DocumentUpdate, NotUsed] = Source
  .single(DocumentUpdate(filter = Filters.eq("id", 1), update = Updates.set("updateValue", 0)))

source.runWith(MongoSink.updateOne(2, numbersColl))

Delete

We can delete documents with a Source of filters. Use either deleteOne or deleteMany if the filter should target one or many documents.

Scala
val source: Source[Bson, NotUsed] = Source.single(Filters.eq("id", 1))
source.runWith(MongoSink.deleteOne(2, numbersColl))

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Test code requires a MongoDB server running in the background. You can start one quickly using docker:

docker-compose up mongo

Scala
sbt
> mongodb/test
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.