MongoDB

The MongoDB connector allows you to read and save documents. You can query a stream of documents from `MongoSource``MongoSource` or update documents in a collection with `MongoSink``MongoSink`.

This connector is based on the Mongo Reactive Streams Driver.

Alternative connector

Another MongoDB connector is available - ReactiveMongo. It is a Scala driver that provides fully non-blocking and asynchronous I/O operations. Please read more about it in the ReactiveMongo documentation.

Project Info: Alpakka MongoDB
Artifact
com.lightbend.akka
akka-stream-alpakka-mongodb
1.1.2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12
JPMS module nameakka.stream.alpakka.mongodb
License
Readiness level
Since 0.15, 2017-12-06
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.1.2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-mongodb_2.12</artifactId>
  <version>1.1.2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-mongodb_2.12', version: '1.1.2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.typesafe.akkaakka-stream_2.122.5.23Apache License, Version 2.0
org.mongodbmongodb-driver-reactivestreams1.11.0The Apache Software License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.typesafe.akka    akka-stream_2.12    2.5.23    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.23    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.23    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.7    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.mongodb    mongodb-driver-reactivestreams    1.11.0    The Apache Software License, Version 2.0
    org.mongodb    mongodb-driver-async    3.10.0    The Apache License, Version 2.0
        org.mongodb    bson    3.10.0    The Apache License, Version 2.0
        org.mongodb    mongodb-driver-core    3.10.0    The Apache License, Version 2.0
            org.mongodb    bson    3.10.0    The Apache License, Version 2.0
    org.reactivestreams    reactive-streams    1.0.2    CC0
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Initialization

In the code examples below we will be using Mongo’s support for automatic codec derivation for POJOs. For Scala we will be using a case class and a macro based codec derivation. For Java a codec for POJO is derived using reflection.

Scala
case class Number(_id: Int)
Java
public final class Number {
  private Integer _id;

  public Number() {}

  public Number(Integer _id) {
    this._id = _id;
  }

  public void setId(Integer _id) {
    this._id = _id;
  }

  public Integer getId() {
    return _id;
  }

}

For codec support, you first need to setup a `CodecRegistry`.

Scala
import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries}
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.bson.codecs.Macros._

val codecRegistry = fromRegistries(fromProviders(classOf[Number]), DEFAULT_CODEC_REGISTRY)
Java
PojoCodecProvider codecProvider = PojoCodecProvider.builder().register(Number.class).build();
CodecRegistry codecRegistry =
    CodecRegistries.fromProviders(codecProvider, new ValueCodecProvider());

Sources provided by this connector need a prepared collection to communicate with the MongoDB server. To get a reference to a collection, let’s initialize a MongoDB connection and access the database.

Scala
private val client = MongoClients.create("mongodb://localhost:27017")
private val db = client.getDatabase("MongoSourceSpec")
private val numbersColl = db
  .getCollection("numbers", classOf[Number])
  .withCodecRegistry(codecRegistry)
Java
client = MongoClients.create("mongodb://localhost:27017");
db = client.getDatabase("MongoSourceTest");
numbersColl = db.getCollection("numbers", Number.class).withCodecRegistry(codecRegistry);

We will also need an `ActorSystem` and an `ActorMaterializer`.

Scala
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
Java
system = ActorSystem.create();
mat = ActorMaterializer.create(system);

Source

Let’s create a source from a Reactive Streams Publisher.

Scala
val source: Source[Number, NotUsed] =
  MongoSource(numbersColl.find(classOf[Number]))
Java
final Source<Number, NotUsed> source = MongoSource.create(numbersColl.find(Number.class));

And then run it.

Scala
val rows: Future[Seq[Number]] = source.runWith(Sink.seq)
Java
final CompletionStage<List<Number>> rows = source.runWith(Sink.seq(), mat);

Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.

Flow and Sink

Each of these sink factory methods have a corresponding factory in `MongoFlow``MongoFlow` which will emit the written document or result of the operation downstream.

Insert

We can use a Source of documents to save them to a mongo collection using `MongoSink.insertOne``MongoSink.insertOne` or `MongoSink.insertMany``MongoSink.insertMany`.

Scala
val testRangeObjects = testRange.map(Number(_))
val source = Source(testRangeObjects)
source.runWith(MongoSink.insertOne(numbersColl)).futureValue
Java
List<Number> testRangeObjects =
    testRange.stream().map(Number::new).collect(Collectors.toList());
final CompletionStage<Done> completion =
    Source.from(testRangeObjects).runWith(MongoSink.insertOne(numbersColl), mat);

Insert Many

Insert many can be used if you have a collection of documents to insert at once.

Scala
val objects = testRange.map(Number(_))
val source = Source(objects)
val completion = source.grouped(2).runWith(MongoSink.insertMany[Number](numbersColl))
Java
final List<Number> testRangeObjects =
    testRange.stream().map(Number::new).collect(Collectors.toList());
final CompletionStage<Done> completion =
    Source.from(testRangeObjects).grouped(2).runWith(MongoSink.insertMany(numbersColl), mat);

Update

We can update documents with a Source of `DocumentUpdate` which is a filter and a update definition. Use either `MongoSink.updateOne``MongoSink.updateOne` or `MongoSink.updateMany``MongoSink.updateMany` if the filter should target one or many documents.

Scala
val source = Source(testRange).map(
  i => DocumentUpdate(filter = Filters.eq("value", i), update = Updates.set("updateValue", i * -1))
)
val completion = source.runWith(MongoSink.updateOne(numbersDocumentColl))
Java
final Source<DocumentUpdate, NotUsed> source =
    Source.from(testRange)
        .map(
            i ->
                DocumentUpdate.create(
                    Filters.eq("value", i), Updates.set("updateValue", i * -1)));
final CompletionStage<Done> completion =
    source.runWith(MongoSink.updateOne(numbersDocumentColl), mat);

Delete

We can delete documents with a Source of filters. Use either `MongoSink.deleteOne``MongoSink.deleteOne` or `MongoSink.deleteMany``MongoSink.deleteMany` if the filter should target one or many documents.

Scala
val source = Source(testRange).map(i => Filters.eq("value", i))
val completion = source.runWith(MongoSink.deleteOne(numbersDocumentColl))
Java
final Source<Bson, NotUsed> source = Source.from(testRange).map(i -> Filters.eq("value", i));
final CompletionStage<Done> completion =
    source.runWith(MongoSink.deleteOne(numbersDocumentColl), mat);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.