Couchbase
The Couchbase connector has not been updated for too long and is now considered End of Life. It will be removed with the next release of Alpakka.
Couchbase is an open-source, distributed (shared-nothing architecture) multi-model NoSQL document-oriented database software package that is optimized for interactive applications. These applications may serve many concurrent users by creating, storing, retrieving, aggregating, manipulating and presenting data. In support of these kinds of application needs, Couchbase Server is designed to provide easy-to-scale key-value or JSON document access with low latency and high sustained throughput. It is designed to be clustered from a single machine to very large-scale deployments spanning many machines.
Couchbase provides client protocol compatibility with memcached, but adds disk persistence, data replication, live cluster reconfiguration, rebalancing and multitenancy with data partitioning.
Alpakka Couchbase allows you to read and write to Couchbase. You can query a bucket from CouchbaseSource using N1QL queries or reading by document ID. Couchbase connector uses Couchbase Java SDK version 2.7.23 behind the scenes.
The Couchbase connector supports all document formats which are supported by the SDK. All those formats use the Document<T>
Document[T]
interface and this is the level of abstraction that this connector is using.
Project Info: Alpakka Couchbase | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-couchbase
9.0.0
|
JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 |
Scala versions | 2.13.12 |
JPMS module name | akka.stream.alpakka.couchbase |
License | |
Readiness level | End-of-Life, it is not recommended to use this project any more.
Since 9.0.0-M1, 2024-10-14
|
Home page | https://doc.akka.io/libraries/alpakka/current |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.10.0" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-couchbase" % "9.0.0", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
<properties> <akka.version>2.10.0</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-couchbase_${scala.binary.version}</artifactId> <version>9.0.0</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ AkkaVersion: "2.10.0", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-couchbase_${versions.ScalaBinary}:9.0.0" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.couchbase.client java-client 2.7.23 com.typesafe.akka akka-pki_2.13 2.10.0 com.typesafe.akka akka-stream_2.13 2.10.0 io.reactivex rxjava-reactive-streams 1.2.1 org.scala-lang scala-library 2.13.12 - Dependency tree
com.couchbase.client java-client 2.7.23 The Apache Software License, Version 2.0 com.couchbase.client core-io 1.7.23 The Apache Software License, Version 2.0 io.opentracing opentracing-api 0.31.0 io.reactivex rxjava 1.3.8 The Apache Software License, Version 2.0 com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.slf4j slf4j-api 2.0.16 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0 io.reactivex rxjava-reactive-streams 1.2.1 The Apache Software License, Version 2.0 io.reactivex rxjava 1.3.8 The Apache Software License, Version 2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0
Overview
Alpakka Couchbase offers both Akka Streams APIs and a more direct API to access Couchbase:
CouchbaseSession
CouchbaseSession
offers a direct API for one-off operationsCouchbaseSessionRegistry
CouchbaseSessionRegistry
is an Akka extension to keep track and shareCouchbaseSession
s within anActorSystem
CouchbaseSource
CouchbaseSource
,CouchbaseFlow
CouchbaseFlow
, andCouchbaseSink
CouchbaseSink
offer factory methods to create Akka Stream operators
Configuration
All operations use the CouchbaseSession
internally. A session is configured with CouchbaseSessionSettings
CouchbaseSessionSettings
and a Couchbase bucket name. The Akka Stream factory methods create and access the corresponding session instance behind the scenes.
By default the CouchbaseSessionSettings
are read from the alpakka.couchbase.session
section from the configuration eg. in your application.conf
.
- Settings
-
source
alpakka.couchbase { session { nodes = ["localhost"] username = "Administrator" password = "password" } }
Using Akka Discovery
To delegate the configuration of Couchbase nodes to any of Akka Discovery’s lookup mechanisms, specify a service name and lookup timeout in the Couchbase section, and pass in DiscoverySupport
DiscoverySupport
nodes lookup to enrichAsync
and configure Akka Discovery accordingly.
The Akka Discovery dependency has to be added explicitly.
- Discovery settings (Config discovery)
-
source
alpakka.couchbase { session { service { name = couchbase-service lookup-timeout = 1 s } username = "anotherUser" password = "differentPassword" } } akka.discovery.method = config akka.discovery.config.services = { couchbase-service = { endpoints = [ { host = "akka.io" } ] } }
To enable Akka Discovery on the CouchbaseSessionSettings
, use DiscoverySupport.nodes()
as enrichment function.
- Scala
-
source
import akka.stream.alpakka.couchbase.scaladsl.{CouchbaseSession, DiscoverySupport} import akka.stream.alpakka.couchbase.{CouchbaseSessionRegistry, CouchbaseSessionSettings} val registry = CouchbaseSessionRegistry(actorSystem) val sessionSettings = CouchbaseSessionSettings(actorSystem) .withEnrichAsync(DiscoverySupport.nodes()) val sessionFuture: Future[CouchbaseSession] = registry.sessionFor(sessionSettings, bucketName)
- Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseSessionRegistry; import akka.stream.alpakka.couchbase.CouchbaseSessionSettings; import akka.stream.alpakka.couchbase.javadsl.DiscoverySupport; import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession; CouchbaseSessionRegistry registry = CouchbaseSessionRegistry.get(actorSystem); CouchbaseSessionSettings sessionSettings = CouchbaseSessionSettings.create(actorSystem) .withEnrichAsyncCs(DiscoverySupport.getNodes(actorSystem)); CompletionStage<CouchbaseSession> session = registry.getSessionFor(sessionSettings, bucketName);
Reading from Couchbase in Akka Streams
Using statements
To query Couchbase using the statement DSL use CouchbaseSource.fromStatement
.
- Scala
-
source
import com.couchbase.client.java.query.Select.select import com.couchbase.client.java.query.dsl.Expression._ val resultAsFuture: Future[Seq[JsonObject]] = CouchbaseSource .fromStatement(sessionSettings, select("*").from(i(queryBucketName)).limit(10), bucketName) .runWith(Sink.seq)
- Java
-
source
import static com.couchbase.client.java.query.Select.select; import static com.couchbase.client.java.query.dsl.Expression.*; CompletionStage<List<JsonObject>> resultCompletionStage = CouchbaseSource.fromStatement( sessionSettings, select("*").from(i(queryBucketName)).limit(10), bucketName) .runWith(Sink.seq(), actorSystem);
Using N1QL queries
To query Couchbase using the “N1QL” queries use CouchbaseSource.fromN1qlQuery
.
- Scala
-
source
import com.couchbase.client.java.query.{N1qlParams, N1qlQuery} val params = N1qlParams.build.adhoc(false) val query = N1qlQuery.simple(s"select count(*) from $queryBucketName", params) val resultAsFuture: Future[Seq[JsonObject]] = CouchbaseSource .fromN1qlQuery(sessionSettings, query, bucketName) .runWith(Sink.seq)
- Java
-
source
import com.couchbase.client.java.query.N1qlParams; import com.couchbase.client.java.query.N1qlQuery; N1qlParams params = N1qlParams.build().adhoc(false); SimpleN1qlQuery query = N1qlQuery.simple("select count(*) from " + queryBucketName, params); CompletionStage<JsonObject> resultCompletionStage = CouchbaseSource.fromN1qlQuery(sessionSettings, query, bucketName) .runWith(Sink.head(), actorSystem);
Get by ID
CouchbaseFlow.fromId
methods allow to read documents specified by the document ID in the Akka Stream.
- Scala
-
source
val ids = immutable.Seq("First", "Second", "Third", "Fourth") val futureResult: Future[immutable.Seq[JsonDocument]] = Source(ids) .via( CouchbaseFlow.fromId( sessionSettings, bucketName ) ) .runWith(Sink.seq)
- Java
-
source
List<String> ids = Arrays.asList("First", "Second", "Third", "Fourth"); CompletionStage<List<JsonDocument>> result = Source.from(ids) .via(CouchbaseFlow.fromId(sessionSettings, queryBucketName)) .runWith(Sink.seq(), actorSystem);
Writing to Couchbase in Akka Streams
For each mutation operation we need to create CouchbaseWriteSettings
CouchbaseWriteSettings
instance which consists of the following parameters
- Parallelism in access to Couchbase (default 1)
- Couchbase Replication Factor (default
ReplicateTo.NONE
) - Couchbase Persistence Level for Write Operation (default
PersistTo.NONE
) - 2 seconds operation timeout
These default values are not recommended for production use, as they do not persist to disk for any node.
- Scala
-
source
import akka.stream.alpakka.couchbase.CouchbaseWriteSettings import com.couchbase.client.java.{PersistTo, ReplicateTo} val writeSettings = CouchbaseWriteSettings() .withParallelism(3) .withPersistTo(PersistTo.FOUR) .withReplicateTo(ReplicateTo.THREE) .withTimeout(5.seconds)
- Java
-
source
CouchbaseWriteSettings writeSettings = CouchbaseWriteSettings.create() .withParallelism(3) .withPersistTo(PersistTo.FOUR) .withReplicateTo(ReplicateTo.THREE) .withTimeout(Duration.ofSeconds(5));
Read more about durability settings in the Couchbase documentation.
Upsert
The CouchbaseFlow
and CouchbaseSink
offer factories for upserting documents (insert or update) in Couchbase. upsert
is used for the most commonly used JsonDocument
, and upsertDoc
has as type parameter to support any variants of Document[T]
Document<T>
which may be RawJsonDocument
, StringDocument
or BinaryDocument
.
The upsert
and upsertDoc
operators fail the stream on any error when writing to Couchbase. To handle failures in-stream use upsertDocWithResult
shown below.
- Scala
-
source
val obj = TestObject(id = "First", "First") val writeSettings = CouchbaseWriteSettings() val jsonDocumentUpsert: Future[Done] = Source .single(obj) .map(toJsonDocument) .via( CouchbaseFlow.upsert( sessionSettings, writeSettings, bucketName ) ) .runWith(Sink.ignore) val stringDocumentUpsert: Future[Done] = Source .single(sampleData) .map(toStringDocument) .via( CouchbaseFlow.upsertDoc( sessionSettings, writeSettings, bucketName ) ) .runWith(Sink.ignore)
- Java
-
source
CompletionStage<JsonDocument> jsonDocumentUpsert = Source.single(obj) .map(support::toJsonDocument) .via(CouchbaseFlow.upsert(sessionSettings, writeSettings, bucketName)) .runWith(Sink.head(), actorSystem);
For single document modifications you may consider using the CouchbaseSession
methods directly, they offer a future-basedCompletionStage-based API which in many cases might be simpler than using Akka Streams with just one element (see below)
Couchbase writes may fail temporarily for a particular node. If you want to handle such failures without restarting the whole stream, the upsertDocWithResult
operator captures failures from Couchbase and emits CouchbaseWriteResult
sub-classes CouchbaseWriteSuccess
and CouchbaseWriteFailure
downstream.
- Scala
-
source
import akka.stream.alpakka.couchbase.{CouchbaseWriteFailure, CouchbaseWriteResult} val result: Future[immutable.Seq[CouchbaseWriteResult[RawJsonDocument]]] = Source(sampleSequence) .map(toRawJsonDocument) .via( CouchbaseFlow.upsertDocWithResult( sessionSettings, writeSettings, bucketName ) ) .runWith(Sink.seq) val failedDocs: immutable.Seq[CouchbaseWriteFailure[RawJsonDocument]] = result.futureValue.collect { case res: CouchbaseWriteFailure[RawJsonDocument] => res }
- Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseWriteFailure; import akka.stream.alpakka.couchbase.CouchbaseWriteResult; CompletionStage<List<CouchbaseWriteResult<StringDocument>>> upsertResults = Source.from(sampleSequence) .map(support::toStringDocument) .via(CouchbaseFlow.upsertDocWithResult(sessionSettings, writeSettings, bucketName)) .runWith(Sink.seq(), actorSystem); List<CouchbaseWriteResult<StringDocument>> writeResults = upsertResults.toCompletableFuture().get(3, TimeUnit.SECONDS); List<CouchbaseWriteFailure<StringDocument>> failedDocs = writeResults.stream() .filter(CouchbaseWriteResult::isFailure) .map(res -> (CouchbaseWriteFailure<StringDocument>) res) .collect(Collectors.toList());
Replace
The CouchbaseFlow
and CouchbaseSink
offer factories for replacing documents in Couchbase. replace
is used for the most commonly used JsonDocument
, and replaceDoc
has as type parameter to support any variants of Document[T]
Document<T>
which may be RawJsonDocument
, StringDocument
or BinaryDocument
.
The replace
and replaceDoc
operators fail the stream on any error when writing to Couchbase. To handle failures in-stream use replaceDocWithResult
shown below.
A replace
action will fail if the original Document can’t be found in Couchbase with a DocumentDoesNotExistException
.
- Scala
-
source
val replaceFuture: Future[Done] = Source .single(obj) .map(toJsonDocument) .via( CouchbaseFlow.replace( sessionSettings, writeSettings, bucketName ) ) .runWith(Sink.ignore)
- Java
-
source
import com.couchbase.client.java.error.DocumentDoesNotExistException; CompletionStage<JsonDocument> jsonDocumentReplace = Source.single(obj) .map(support::toJsonDocument) .via(CouchbaseFlow.replace(sessionSettings, writeSettings, bucketName)) .runWith(Sink.head(), actorSystem); CompletionStage<JsonDocument> jsonDocumentReplace = Source.single(obj) .map(support::toJsonDocument) .via(CouchbaseFlow.replace(sessionSettings, writeSettings, bucketName)) .runWith(Sink.head(), actorSystem);
For single document modifications you may consider using the CouchbaseSession
methods directly, they offer a future-basedCompletionStage-based API which in many cases might be simpler than using Akka Streams with just one element (see below)
Couchbase writes may fail temporarily for a particular node. If you want to handle such failures without restarting the whole stream, the replaceDocWithResult
operator captures failures from Couchbase and emits CouchbaseWriteResult
sub-classes CouchbaseWriteSuccess
and CouchbaseWriteFailure
downstream.
- Scala
-
source
import akka.stream.alpakka.couchbase.{CouchbaseWriteFailure, CouchbaseWriteResult} val result: Future[immutable.Seq[CouchbaseWriteResult[RawJsonDocument]]] = Source(sampleSequence) .map(toRawJsonDocument) .via( CouchbaseFlow.upsertDocWithResult( sessionSettings, writeSettings, bucketName ) ) .runWith(Sink.seq) val failedDocs: immutable.Seq[CouchbaseWriteFailure[RawJsonDocument]] = result.futureValue.collect { case res: CouchbaseWriteFailure[RawJsonDocument] => res }
- Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseWriteFailure; import akka.stream.alpakka.couchbase.CouchbaseWriteResult; CompletionStage<List<CouchbaseWriteResult<StringDocument>>> upsertResults = Source.from(sampleSequence) .map(support::toStringDocument) .via(CouchbaseFlow.upsertDocWithResult(sessionSettings, writeSettings, bucketName)) .runWith(Sink.seq(), actorSystem); List<CouchbaseWriteResult<StringDocument>> writeResults = upsertResults.toCompletableFuture().get(3, TimeUnit.SECONDS); List<CouchbaseWriteFailure<StringDocument>> failedDocs = writeResults.stream() .filter(CouchbaseWriteResult::isFailure) .map(res -> (CouchbaseWriteFailure<StringDocument>) res) .collect(Collectors.toList());
Delete
The CouchbaseFlow
and CouchbaseSink
offer factories to delete documents in Couchbase by ID.
- Scala
-
source
val deleteFuture: Future[Done] = Source .single(sampleData.id) .via( CouchbaseFlow.delete( sessionSettings, writeSettings, bucketName ) ) .runWith(Sink.ignore)
- Java
-
source
CompletionStage<String> result = Source.single(sampleData.id()) .via(CouchbaseFlow.delete(sessionSettings, writeSettings, bucketName)) .runWith(Sink.head(), actorSystem);
To handle any delete failures such as non-existent documents in-stream, use the the deleteWithResult
operator. It captures failures from Couchbase and emits CouchbaseDeleteResult
s.
- Scala
-
source
val deleteResult: Future[CouchbaseDeleteResult] = Source .single("non-existent") .via( CouchbaseFlow.deleteWithResult( sessionSettings, writeSettings, bucketName ) ) .runWith(Sink.head)
- Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseDeleteResult; CompletionStage<CouchbaseDeleteResult> result = Source.single("non-existent") .via(CouchbaseFlow.deleteWithResult(sessionSettings, writeSettings, bucketName)) .runWith(Sink.head(), actorSystem);
Using CouchbaseSession
directly
Access via registry
The CouchbaseSesionRegistry
is an Akka extension to manage the life-cycle of Couchbase sessions. All underlying instances are closed upon actor system termination.
When accessing more than one Couchbase cluster, the CouchbaseEnvironment
should be shared by setting a single instance for the different CouchbaseSessionSettings
.
- Scala
-
source
import akka.stream.alpakka.couchbase.CouchbaseSessionRegistry import akka.stream.alpakka.couchbase.CouchbaseSessionSettings import akka.stream.alpakka.couchbase.scaladsl.CouchbaseSession import com.couchbase.client.java.env.{CouchbaseEnvironment, DefaultCouchbaseEnvironment} // Akka extension (singleton per actor system) val registry = CouchbaseSessionRegistry(actorSystem) // If connecting to more than one Couchbase cluster, the environment should be shared val environment: CouchbaseEnvironment = DefaultCouchbaseEnvironment.create() actorSystem.registerOnTermination { environment.shutdown() } val sessionSettings = CouchbaseSessionSettings(actorSystem) .withEnvironment(environment) val sessionFuture: Future[CouchbaseSession] = registry.sessionFor(sessionSettings, bucketName)
- Java
-
source
import com.couchbase.client.java.env.CouchbaseEnvironment; import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; import akka.stream.alpakka.couchbase.CouchbaseSessionRegistry; import akka.stream.alpakka.couchbase.CouchbaseSessionSettings; import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession; CouchbaseSessionRegistry registry = CouchbaseSessionRegistry.get(actorSystem); // If connecting to more than one Couchbase cluster, the environment should be shared CouchbaseEnvironment environment = DefaultCouchbaseEnvironment.create(); actorSystem.registerOnTermination(() -> environment.shutdown()); CouchbaseSessionSettings sessionSettings = CouchbaseSessionSettings.create(actorSystem).withEnvironment(environment); CompletionStage<CouchbaseSession> sessionCompletionStage = registry.getSessionFor(sessionSettings, bucketName);
Manage session life-cycle
Use CouchbaseSessionSettings
to get an instance of CouchbaseSession
. These settings may be specified in application.conf
and complemented in code. Furthermore a session requires the bucket name and needs an ExecutionContext
as the creation is asynchronous.
- Scala
-
source
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings import akka.stream.alpakka.couchbase.scaladsl.CouchbaseSession implicit val ec: ExecutionContext = actorSystem.dispatcher val sessionSettings = CouchbaseSessionSettings(actorSystem) val sessionFuture: Future[CouchbaseSession] = CouchbaseSession(sessionSettings, bucketName) actorSystem.registerOnTermination(sessionFuture.flatMap(_.close())) val documentFuture = sessionFuture.flatMap { session => val id = "myId" val documentFuture: Future[Option[JsonDocument]] = session.get(id) documentFuture.flatMap { case Some(jsonDocument) => Future.successful(jsonDocument) case None => Future.failed(new RuntimeException(s"document $id wasn't found")) } }
- Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings; import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession; Executor executor = Executors.newSingleThreadExecutor(); CouchbaseSessionSettings sessionSettings = CouchbaseSessionSettings.create(actorSystem); CompletionStage<CouchbaseSession> sessionCompletionStage = CouchbaseSession.create(sessionSettings, bucketName, executor); actorSystem.registerOnTermination( () -> sessionCompletionStage.thenAccept(CouchbaseSession::close)); sessionCompletionStage.thenAccept( session -> { String id = "myId"; CompletionStage<Optional<JsonDocument>> documentCompletionStage = session.get(id); documentCompletionStage.thenAccept( opt -> { if (opt.isPresent()) { System.out.println(opt.get()); } else { System.out.println("Document " + id + " wasn't found"); } }); });
Manage bucket life-cycle
For full control a CouchbaseSession
may be created from a Couchbase Bucket
. See Scalability and Concurrency in the Couchbase documentation for details.
- Scala
-
source
import com.couchbase.client.java.auth.PasswordAuthenticator import com.couchbase.client.java.{Bucket, CouchbaseCluster} val cluster: CouchbaseCluster = CouchbaseCluster.create("localhost") cluster.authenticate(new PasswordAuthenticator("Administrator", "password")) val bucket: Bucket = cluster.openBucket("akka") val session: CouchbaseSession = CouchbaseSession(bucket) actorSystem.registerOnTermination { session.close() } val id = "myId" val documentFuture = session.get(id).flatMap { case Some(jsonDocument) => Future.successful(jsonDocument) case None => Future.failed(new RuntimeException(s"document $id wasn't found")) }
- Java
-
source
import com.couchbase.client.java.Bucket; import com.couchbase.client.java.CouchbaseCluster; import com.couchbase.client.java.auth.PasswordAuthenticator; CouchbaseCluster cluster = CouchbaseCluster.create("localhost"); cluster.authenticate(new PasswordAuthenticator("Administrator", "password")); Bucket bucket = cluster.openBucket("akka"); CouchbaseSession session = CouchbaseSession.create(bucket); actorSystem.registerOnTermination( () -> { session.close(); bucket.close(); }); String id = "First"; CompletionStage<Optional<JsonDocument>> documentCompletionStage = session.get(id); documentCompletionStage.thenAccept( opt -> { if (opt.isPresent()) { System.out.println(opt.get()); } else { System.out.println("Document " + id + " wasn't found"); } });
To learn about the full range of operations on CouchbaseSession
, read the CouchbaseSession
CouchbaseSession
API documentation.