MongoDB
The MongoDB connector allows you to read and save documents. You can query a stream of documents from MongoSourceMongoSource or update documents in a collection with MongoSinkMongoSink.
This connector is based on the MongoDB Java Driver, which is compatible with MongoDB versions 2.6 through 4.4.
Another MongoDB connector is available - ReactiveMongo. It is a Scala driver that provides fully non-blocking and asynchronous I/O operations. Please read more about it in the ReactiveMongo documentation.
| Project Info: Alpakka MongoDB | |
|---|---|
| Artifact | com.lightbend.akka akka-stream-alpakka-mongodb 9.0.2 | 
| JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 | 
| Scala versions | 2.13.15 | 
| JPMS module name | akka.stream.alpakka.mongodb | 
| License | |
| Readiness level | Since 0.15, 2017-12-06 | 
| Home page | https://doc.akka.io/libraries/alpakka/current | 
| API documentation | |
| Forums | |
| Release notes | GitHub releases | 
| Issues | Github issues | 
| Sources | https://github.com/akka/alpakka | 
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
- resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
- <project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
- repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
Additionally, add the dependencies as below.
- sbt
- val AkkaVersion = "2.10.5" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "9.0.2", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
- <properties> <akka.version>2.10.5</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-mongodb_${scala.binary.version}</artifactId> <version>9.0.2</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>
- Gradle
- def versions = [ AkkaVersion: "2.10.5", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-mongodb_${versions.ScalaBinary}:9.0.2" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
- Organization - Artifact - Version - com.typesafe.akka - akka-stream_2.13 - 2.10.5 - org.mongodb.scala - mongo-scala-driver_2.13 - 4.11.5 - org.scala-lang - scala-library - 2.13.15 
- Dependency tree
- com.typesafe.akka akka-stream_2.13 2.10.5 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.5 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.5 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.mongodb.scala mongo-scala-driver_2.13 4.11.5 The Apache License, Version 2.0 org.mongodb.scala mongo-scala-bson_2.13 4.11.5 The Apache License, Version 2.0 org.mongodb bson 4.11.5 The Apache License, Version 2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-reflect 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.mongodb mongodb-driver-reactivestreams 4.11.5 The Apache License, Version 2.0 io.projectreactor reactor-core 3.5.0 Apache License, Version 2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.mongodb bson 4.11.5 The Apache License, Version 2.0 org.mongodb mongodb-driver-core 4.11.5 The Apache License, Version 2.0 org.mongodb bson-record-codec 4.11.5 The Apache License, Version 2.0 org.mongodb bson 4.11.5 The Apache License, Version 2.0 org.mongodb bson 4.11.5 The Apache License, Version 2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-reflect 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 
Initialization
In the code examples below we will be using Mongo’s support for automatic codec derivation for POJOs. For Scala we will be using a case class and a macro based codec derivation. For Java a codec for POJO is derived using reflection.
- Scala
- 
  source case class Number(_id: Int)
- Java
- 
  source public final class Number { private Integer _id; public Number() {} public Number(Integer _id) { this._id = _id; } public void setId(Integer _id) { this._id = _id; } public Integer getId() { return _id; } }
For codec support, you first need to setup a CodecRegistry.
- Scala
- 
  source import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries} import org.mongodb.scala.MongoClient.DEFAULT_CODEC_REGISTRY import org.mongodb.scala.bson.codecs.Macros._ val codecRegistry = fromRegistries(fromProviders(classOf[Number]), DEFAULT_CODEC_REGISTRY)
- Java
- 
  source PojoCodecProvider codecProvider = PojoCodecProvider.builder().register(Number.class).build(); CodecRegistry codecRegistry = CodecRegistries.fromProviders(codecProvider, new ValueCodecProvider());
Sources provided by this connector need a prepared collection to communicate with the MongoDB server. To get a reference to a collection, let’s initialize a MongoDB connection and access the database.
- Scala
- 
  source private val client = MongoClients.create("mongodb://localhost:27017") private val db = client.getDatabase("MongoSourceSpec") private val numbersColl = db .getCollection("numbers", classOf[Number]) .withCodecRegistry(codecRegistry)
- Java
- 
  source client = MongoClients.create("mongodb://localhost:27017"); db = client.getDatabase("MongoSourceTest"); numbersColl = db.getCollection("numbers", Number.class).withCodecRegistry(codecRegistry);
We will also need an ActorSystemActorSystem.
- Scala
- 
  source implicit val system: ActorSystem = ActorSystem()
- Java
- 
  source system = ActorSystem.create();
Source
Let’s create a source from a Reactive Streams Publisher.
- Scala
- 
  source val source: Source[Number, NotUsed] = MongoSource(numbersColl.find(classOf[Number]))
- Java
- 
  source final Source<Number, NotUsed> source = MongoSource.create(numbersColl.find(Number.class));
And then run it.
- Scala
- 
  source val rows: Future[Seq[Number]] = source.runWith(Sink.seq)
- Java
- 
  source final CompletionStage<List<Number>> rows = source.runWith(Sink.seq(), system);
Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.
Flow and Sink
Each of these sink factory methods have a corresponding factory in MongoFlowMongoFlow which will emit the written document or result of the operation downstream.
Insert
We can use a Source of documents to save them to a mongo collection using MongoSink.insertOneMongoSink.insertOne or MongoSink.insertManyMongoSink.insertMany.
- Scala
- 
  source val testRangeObjects = testRange.map(Number) val source = Source(testRangeObjects) source.runWith(MongoSink.insertOne(numbersColl)).futureValue
- Java
- 
  source List<Number> testRangeObjects = testRange.stream().map(Number::new).collect(toList()); final CompletionStage<Done> completion = Source.from(testRangeObjects).runWith(MongoSink.insertOne(numbersColl), system);
Insert Many
Insert many can be used if you have a collection of documents to insert at once.
- Scala
- 
  source val objects = testRange.map(Number) val source = Source(objects) val completion = source.grouped(2).runWith(MongoSink.insertMany[Number](numbersColl))
- Java
- 
  source final List<Number> testRangeObjects = testRange.stream().map(Number::new).collect(toList()); final CompletionStage<Done> completion = Source.from(testRangeObjects).grouped(2).runWith(MongoSink.insertMany(numbersColl), system);
Update
We can update documents with a Source of DocumentUpdate which is a filter and an update definition. Use either MongoSink.updateOneMongoSink.updateOne or MongoSink.updateManyMongoSink.updateMany if the filter should target one or many documents.
- Scala
- 
  source val source = Source(testRange).map( i => DocumentUpdate(filter = Filters.eq("value", i), update = Updates.set("updateValue", i * -1)) ) val completion = source.runWith(MongoSink.updateOne(numbersDocumentColl))
- Java
- 
  source final Source<DocumentUpdate, NotUsed> source = Source.from(testRange) .map( i -> DocumentUpdate.create( Filters.eq("value", i), Updates.set("updateValue", i * -1))); final CompletionStage<Done> completion = source.runWith(MongoSink.updateOne(numbersDocumentColl), system);
Delete
We can delete documents with a Source of filters. Use either MongoSink.deleteOneMongoSink.deleteOne or MongoSink.deleteManyMongoSink.deleteMany if the filter should target one or many documents.
- Scala
- 
  source val source = Source(testRange).map(i => Filters.eq("value", i)) val completion = source.runWith(MongoSink.deleteOne(numbersDocumentColl))
- Java
- 
  source final Source<Bson, NotUsed> source = Source.from(testRange).map(i -> Filters.eq("value", i)); final CompletionStage<Done> completion = source.runWith(MongoSink.deleteOne(numbersDocumentColl), system);