MongoDB
The MongoDB connector allows you to read and save documents. You can query a stream of documents from MongoSource
MongoSource
or update documents in a collection with MongoSink
MongoSink
.
This connector is based on the Mongo Reactive Streams Driver.
Another MongoDB connector is available - ReactiveMongo. It is a Scala driver that provides fully non-blocking and asynchronous I/O operations. Please read more about it in the ReactiveMongo documentation.
Project Info: Alpakka MongoDB | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-mongodb
2.0.2
|
JDK versions | Adopt OpenJDK 8 Adopt OpenJDK 11 |
Scala versions | 2.12.11, 2.11.12, 2.13.3 |
JPMS module name | akka.stream.alpakka.mongodb |
License | |
Readiness level |
Since 0.15, 2017-12-06
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
val AkkaVersion = "2.5.31" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "2.0.2", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
<properties> <akka.version>2.5.31</akka.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-mongodb_${scala.binary.version}</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency>
- Gradle
versions += [ AkkaVersion: "2.5.31", ScalaBinary: "2.12" ] dependencies { compile group: 'com.lightbend.akka', name: "akka-stream-alpakka-mongodb_${versions.ScalaBinary}", version: '2.0.2', compile group: 'com.typesafe.akka', name: "akka-stream_${versions.ScalaBinary}", version: versions.AkkaVersion }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-stream_2.12 2.5.31 org.mongodb mongodb-driver-reactivestreams 1.12.0 org.scala-lang scala-library 2.12.11 - Dependency tree
com.typesafe.akka akka-stream_2.12 2.5.31 com.typesafe.akka akka-actor_2.12 2.5.31 com.typesafe config 1.3.3 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 com.typesafe.akka akka-protobuf_2.12 2.5.31 org.scala-lang scala-library 2.12.11 com.typesafe ssl-config-core_2.12 0.3.8 com.typesafe config 1.3.3 org.scala-lang.modules scala-parser-combinators_2.12 1.1.2 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 org.reactivestreams reactive-streams 1.0.2 org.scala-lang scala-library 2.12.11 org.mongodb mongodb-driver-reactivestreams 1.12.0 org.mongodb mongodb-driver-async 3.11.0 org.mongodb bson 3.11.0 org.mongodb mongodb-driver-core 3.11.0 org.mongodb bson 3.11.0 org.reactivestreams reactive-streams 1.0.2 org.scala-lang scala-library 2.12.11
Initialization
In the code examples below we will be using Mongo’s support for automatic codec derivation for POJOs. For Scala we will be using a case class and a macro based codec derivation. For Java a codec for POJO is derived using reflection.
- Scala
-
case class Number(_id: Int)
- Java
-
public final class Number { private Integer _id; public Number() {} public Number(Integer _id) { this._id = _id; } public void setId(Integer _id) { this._id = _id; } public Integer getId() { return _id; } }
For codec support, you first need to setup a CodecRegistry
.
- Scala
-
import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries} import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY import org.mongodb.scala.bson.codecs.Macros._ val codecRegistry = fromRegistries(fromProviders(classOf[Number]), DEFAULT_CODEC_REGISTRY)
- Java
-
PojoCodecProvider codecProvider = PojoCodecProvider.builder().register(Number.class).build(); CodecRegistry codecRegistry = CodecRegistries.fromProviders(codecProvider, new ValueCodecProvider());
Sources provided by this connector need a prepared collection to communicate with the MongoDB server. To get a reference to a collection, let’s initialize a MongoDB connection and access the database.
- Scala
-
private val client = MongoClients.create("mongodb://localhost:27017") private val db = client.getDatabase("MongoSourceSpec") private val numbersColl = db .getCollection("numbers", classOf[Number]) .withCodecRegistry(codecRegistry)
- Java
-
client = MongoClients.create("mongodb://localhost:27017"); db = client.getDatabase("MongoSourceTest"); numbersColl = db.getCollection("numbers", Number.class).withCodecRegistry(codecRegistry);
We will also need an ActorSystem
ActorSystem
and an Materializer
Materializer
.
- Scala
-
implicit val system = ActorSystem() implicit val mat = ActorMaterializer()
- Java
-
system = ActorSystem.create(); mat = ActorMaterializer.create(system);
Source
Let’s create a source from a Reactive Streams Publisher.
- Scala
-
val source: Source[Number, NotUsed] = MongoSource(numbersColl.find(classOf[Number]))
- Java
-
final Source<Number, NotUsed> source = MongoSource.create(numbersColl.find(Number.class));
And then run it.
- Scala
-
val rows: Future[Seq[Number]] = source.runWith(Sink.seq)
- Java
-
final CompletionStage<List<Number>> rows = source.runWith(Sink.seq(), mat);
Here we used a basic sink to complete the stream by collecting all of the stream elements to a collection. The power of streams comes from building larger data pipelines which leverage backpressure to ensure efficient flow control. Feel free to edit the example code and build more advanced stream topologies.
Flow and Sink
Each of these sink factory methods have a corresponding factory in MongoFlow
MongoFlow
which will emit the written document or result of the operation downstream.
Insert
We can use a Source of documents to save them to a mongo collection using MongoSink.insertOne
MongoSink.insertOne
or MongoSink.insertMany
MongoSink.insertMany
.
- Scala
-
val testRangeObjects = testRange.map(Number(_)) val source = Source(testRangeObjects) source.runWith(MongoSink.insertOne(numbersColl)).futureValue
- Java
-
List<Number> testRangeObjects = testRange.stream().map(Number::new).collect(toList()); final CompletionStage<Done> completion = Source.from(testRangeObjects).runWith(MongoSink.insertOne(numbersColl), mat);
Insert Many
Insert many can be used if you have a collection of documents to insert at once.
- Scala
-
val objects = testRange.map(Number(_)) val source = Source(objects) val completion = source.grouped(2).runWith(MongoSink.insertMany[Number](numbersColl))
- Java
-
final List<Number> testRangeObjects = testRange.stream().map(Number::new).collect(toList()); final CompletionStage<Done> completion = Source.from(testRangeObjects).grouped(2).runWith(MongoSink.insertMany(numbersColl), mat);
Update
We can update documents with a Source of DocumentUpdate
which is a filter and a update definition. Use either MongoSink.updateOne
MongoSink.updateOne
or MongoSink.updateMany
MongoSink.updateMany
if the filter should target one or many documents.
- Scala
-
val source = Source(testRange).map( i => DocumentUpdate(filter = Filters.eq("value", i), update = Updates.set("updateValue", i * -1)) ) val completion = source.runWith(MongoSink.updateOne(numbersDocumentColl))
- Java
-
final Source<DocumentUpdate, NotUsed> source = Source.from(testRange) .map( i -> DocumentUpdate.create( Filters.eq("value", i), Updates.set("updateValue", i * -1))); final CompletionStage<Done> completion = source.runWith(MongoSink.updateOne(numbersDocumentColl), mat);
Delete
We can delete documents with a Source of filters. Use either MongoSink.deleteOne
MongoSink.deleteOne
or MongoSink.deleteMany
MongoSink.deleteMany
if the filter should target one or many documents.
- Scala
-
val source = Source(testRange).map(i => Filters.eq("value", i)) val completion = source.runWith(MongoSink.deleteOne(numbersDocumentColl))
- Java
-
final Source<Bson, NotUsed> source = Source.from(testRange).map(i -> Filters.eq("value", i)); final CompletionStage<Done> completion = source.runWith(MongoSink.deleteOne(numbersDocumentColl), mat);