New to Akka? Start with the Akka SDK.
Couchbase
Couchbase is an open-source, distributed (shared-nothing architecture) multi-model NoSQL document-oriented database software package that is optimized for interactive applications. These applications may serve many concurrent users by creating, storing, retrieving, aggregating, manipulating and presenting data. In support of these kinds of application needs, Couchbase Server is designed to provide easy-to-scale key-value or JSON document access with low latency and high sustained throughput. It is designed to be clustered from a single machine to very large-scale deployments spanning many machines.
Couchbase provides client protocol compatibility with memcached, but adds disk persistence, data replication, live cluster reconfiguration, rebalancing and multitenancy with data partitioning.
Alpakka Couchbase allows you to read and write to Couchbase. You can query a bucket from CouchbaseSource using N1QL queries or reading by document ID. Couchbase connector uses Couchbase Java SDK version 3.9.1 behind the scenes.
The Couchbase connector supports all document formats which are supported by the SDK. All those formats use the CouchbaseDocument[T] interface and this is the level of abstraction that this connector is using.
| Project Info: Alpakka Couchbase | |
|---|---|
| Artifact | com.lightbend.akka
akka-stream-alpakka-couchbase
10.0.1
|
| JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 |
| Scala versions | 2.13.17 |
| JPMS module name | akka.stream.alpakka.couchbase |
| License | |
| Readiness level | End-of-Life, it is not recommended to use this project any more.
Since 9.0.0-M1, 2024-10-14
|
| Home page | https://doc.akka.io/libraries/alpakka/current |
| API documentation | |
| Forums | |
| Release notes | GitHub releases |
| Issues | Github issues |
| Sources | https://github.com/akka/alpakka |
Artifacts
The Akka dependencies are available from Akka’s secure library repository. To access them you need to use a secure, tokenized URL as specified at https://account.akka.io/token.
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.10.11" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-couchbase" % "10.0.1", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )- Maven
<properties> <akka.version>2.10.11</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-couchbase_${scala.binary.version}</artifactId> <version>10.0.1</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>- Gradle
def versions = [ AkkaVersion: "2.10.11", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-couchbase_${versions.ScalaBinary}:10.0.1" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.couchbase.client java-client 3.9.1 com.typesafe.akka akka-pki_2.13 2.10.11 com.typesafe.akka akka-stream_2.13 2.10.11 io.reactivex rxjava-reactive-streams 1.2.1 org.scala-lang scala-library 2.13.17 - Dependency tree
com.couchbase.client java-client 3.9.1 The Apache Software License, Version 2.0 com.couchbase.client core-io 3.9.1 The Apache Software License, Version 2.0 io.projectreactor reactor-core 3.6.9 Apache License, Version 2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.jspecify jspecify 1.0.0 The Apache License, Version 2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.slf4j slf4j-api 2.0.17 MIT com.typesafe.akka akka-pki_2.13 2.10.11 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.11 BUSL-1.1 com.typesafe config 1.4.5 Apache-2.0 org.scala-lang scala-library 2.13.17 Apache-2.0 org.scala-lang scala-library 2.13.17 Apache-2.0 org.slf4j slf4j-api 2.0.17 MIT com.typesafe.akka akka-stream_2.13 2.10.11 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.11 BUSL-1.1 com.typesafe config 1.4.5 Apache-2.0 org.scala-lang scala-library 2.13.17 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.11 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.17 Apache-2.0 io.reactivex rxjava-reactive-streams 1.2.1 The Apache Software License, Version 2.0 io.reactivex rxjava 1.2.2 The Apache Software License, Version 2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.17 Apache-2.0
Overview
Alpakka Couchbase offers both Akka Streams APIs and a more direct API to access Couchbase:
CouchbaseSessionCouchbaseSessionoffers a direct API for one-off operations viaCouchbaseCollectionSessionCouchbaseCollectionSessionsCouchbaseSessionRegistryCouchbaseSessionRegistryis an Akka extension to keep track and shareCouchbaseSessions within anActorSystemCouchbaseSourceCouchbaseSource,CouchbaseFlowCouchbaseFlow, andCouchbaseSinkCouchbaseSinkoffer factory methods to create Akka Stream operators
Configuration
All operations use the CouchbaseSession internally. A session is configured with CouchbaseSessionSettingsCouchbaseSessionSettings and a Couchbase bucket name. The Akka Stream factory methods create and access the corresponding session instance behind the scenes.
By default the CouchbaseSessionSettings are read from the alpakka.couchbase.session section from the configuration eg. in your application.conf.
- Settings
-
source
alpakka.couchbase { session { nodes = ["localhost"] username = "Administrator" password = "password" } }
Using Akka Discovery
To delegate the configuration of Couchbase nodes to any of Akka Discovery’s lookup mechanisms, specify a service name and lookup timeout in the Couchbase section, and pass in DiscoverySupportDiscoverySupport nodes lookup to enrichAsync and configure Akka Discovery accordingly.
The Akka Discovery dependency has to be added explicitly.
- Discovery settings (Config discovery)
-
source
alpakka.couchbase { session { service { name = couchbase-service lookup-timeout = 1 s } username = "Administrator" password = "password" } } akka.discovery.method = config akka.discovery.config.services = { couchbase-service = { endpoints = [ { host = "akka.io" } ] } }
To enable Akka Discovery on the CouchbaseSessionSettings, use DiscoverySupport.nodes() as enrichment function.
- Scala
-
source
import akka.stream.alpakka.couchbase.scaladsl.{CouchbaseSession, DiscoverySupport} import akka.stream.alpakka.couchbase.{CouchbaseSessionRegistry, CouchbaseSessionSettings} val registry = CouchbaseSessionRegistry(actorSystem) val sessionSettings = CouchbaseSessionSettings(actorSystem) .withEnrichAsync(DiscoverySupport.nodes()) val sessionFuture: Future[CouchbaseSession] = registry.sessionFor(sessionSettings, support.bucketName) - Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseSessionRegistry; import akka.stream.alpakka.couchbase.CouchbaseSessionSettings; import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession; import akka.stream.alpakka.couchbase.javadsl.DiscoverySupport; CouchbaseSessionRegistry registry = CouchbaseSessionRegistry.get(actorSystem); CouchbaseSessionSettings sessionSettings = CouchbaseSessionSettings.create(actorSystem) .withEnrichAsyncCs(DiscoverySupport.getNodes(actorSystem)); CompletionStage<CouchbaseSession> session = registry.getSessionFor(sessionSettings, bucketName);
Reading from Couchbase in Akka Streams
Using SQL++ queries
To query Couchbase bucket using the SQL++ queries use CouchbaseSource.fromQuery.
- Scala
-
source
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseSource import akka.stream.scaladsl.Sink import com.couchbase.client.java.json.JsonValue import scala.concurrent.Future val resultAsFuture: Future[Seq[JsonValue]] = CouchbaseSource .fromQuery(sessionSettings, bucketName, "SELECT * FROM `" + bucketName + "`.`" + scopeName + "`.`" + collectionName + "` LIMIT 10") .runWith(Sink.seq) - Java
-
source
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSource; import com.couchbase.client.java.json.JsonObject; CompletionStage<List<JsonObject>> resultCompletionStage = CouchbaseSource.fromQuery( sessionSettings, bucketName, "SELECT * FROM `" + support.bucketName() + "`.`" + support.scopeName() + "`.`" + support.collectionName() + "` LIMIT 10") .runWith(Sink.seq(), actorSystem);
Get by ID
CouchbaseFlow.fromId and CouchbaseFlow.bytesFromId methods allow to read documents specified by the document ID in the Akka Stream.
- Scala
-
source
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow import akka.stream.scaladsl.{Sink, Source} import com.couchbase.client.java.json.JsonObject val ids = immutable.Seq("First", "Second", "Third", "Fourth") val futureResult = Source(ids) .via( CouchbaseFlow.fromId[JsonObject]( sessionSettings, bucketName, scopeName, collectionName ) ) .runWith(Sink.seq) val resultsAsFuture = Source(sampleSequence.map(_.getId)) .via( CouchbaseFlow.bytesFromId( sessionSettings, bucketName, scopeName, collectionName ) ) .runWith(Sink.seq) - Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseDocument; import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; List<String> ids = Arrays.asList("First", "Second", "Third", "Fourth"); List<String> idsJson = Arrays.asList("FirstJson", "SecondJson", "ThirdJson", "FourthJson"); CompletionStage<List<CouchbaseDocument<byte[]>>> result = Source.from(ids) .via(CouchbaseFlow.bytesFromId(sessionSettings, queryBucketName, support.scopeName(), support.collectionName())) .runWith(Sink.seq(), actorSystem); List<String> idsJson = Arrays.asList("FirstJson", "SecondJson", "ThirdJson", "FourthJson"); CompletionStage<List<CouchbaseDocument<JsonValue>>> jsonResult = Source.from(idsJson) .via(CouchbaseFlow.fromId(sessionSettings, queryBucketName, support.scopeName(), support.collectionName())) .runWith(Sink.seq(), actorSystem);
Writing to Couchbase in Akka Streams
Access Parallelism
Parallelism in accessing Couchbase can be configureed using CouchbaseSessionSettingsCouchbaseSessionSettings, and by default is set to 1.
The default durability and parallelism values are not recommended for production use.
Operation Options
All mutation operations provided include overloaded methods that accept corresponding operation options (InsertOptions, UpsertOptions, etc…) as a parameter. These options include:
- Couchbase Replication Factor (default
ReplicateTo.NONE) - Couchbase Persistence Level for Write Operation (default
PersistTo.NONE) - Optional mutation expiration value
- Optional transcoder for serializing document values persisted on the cluster
Read more about durability settings in the Couchbase documentation.
All mutation operations are designed to choose transcoders for stored documents based on the T type parameter of CouchbaseDocument[T] interface:
- a
RawBinaryTranscoderwill be selected forArray[Byte]documents - a
RawStringTranscoderwill be selected forStringdocuments - a default session transcoder (usually
JsonTranscoder) will be chosen otherwise
This behavior can be overridden by providing a specific transcoder in mutation operation options or in session environment settings.
Upsert
The CouchbaseFlow and CouchbaseSink offer factories for upserting documents (insert or update) in Couchbase.
The upsert operators fail the stream on any error when writing to Couchbase. To handle failures in-stream use upsertWithResult shown below.
- Scala
-
source
import akka.Done import akka.stream.alpakka.couchbase.CouchbaseDocument import akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow import akka.stream.scaladsl.{Sink, Source} import scala.concurrent.Future val result: Future[Done] = Source .single(sampleData) .map(data => new CouchbaseDocument(data.getId, data.getDocument.getBytes())) .via( CouchbaseFlow.upsert( sessionSettings, bucketName, scopeName, collectionName ) ) .runWith(Sink.ignore) result.futureValue - Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseWriteResult; import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; CompletionStage<Done> jsonDocumentUpsert = Source.single(obj) .via(CouchbaseFlow.upsert(sessionSettings, bucketName, support.scopeName(), support.collectionName())) .runWith(Sink.head(), actorSystem);
For single document modifications you may consider using the CouchbaseSession methods directly, they offer a future-basedCompletionStage-based API which in many cases might be simpler than using Akka Streams with just one element (see below)
Couchbase writes may fail temporarily for a particular node. If you want to handle such failures without restarting the whole stream, the upsertWithResult operator captures failures from Couchbase and emits CouchbaseWriteResult sub-classes CouchbaseWriteSuccess and CouchbaseWriteFailure downstream.
- Scala
-
source
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow import akka.stream.alpakka.couchbase.{CouchbaseDocument, CouchbaseWriteFailure, CouchbaseWriteResult} import akka.stream.scaladsl.{Sink, Source} import scala.collection.immutable import scala.concurrent.Future val result: Future[immutable.Seq[CouchbaseWriteResult]] = Source(sampleSequence) .map(doc => new CouchbaseDocument(doc.getId, doc.getDocument.getBytes)) .via( CouchbaseFlow.upsertWithResult( sessionSettings, bucketName, scopeName, collectionName ) ) .runWith(Sink.seq) val failedDocs: immutable.Seq[CouchbaseWriteFailure] = result.futureValue.collect { case res: CouchbaseWriteFailure => res } import akka.stream.alpakka.couchbase.{CouchbaseWriteFailure, CouchbaseWriteResult} val result: Future[immutable.Seq[CouchbaseWriteResult]] = Source(sampleSequence) .map(doc => new CouchbaseDocument(doc.getId, doc.getDocument.getBytes)) .via( CouchbaseFlow.upsertWithResult( sessionSettings, UpsertOptions .upsertOptions() .transcoder(RawBinaryTranscoder.INSTANCE) .durability(DurabilityLevel.MAJORITY_AND_PERSIST_TO_ACTIVE) .timeout(java.time.Duration.ofSeconds(1)), bucketName, scopeName, collectionName ) ) .runWith(Sink.seq) - Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseWriteFailure; import akka.stream.alpakka.couchbase.CouchbaseWriteResult; import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; CompletionStage<List<CouchbaseWriteResult>> upsertResults = Source.from(sampleSequence) .via(CouchbaseFlow.upsertWithResult(sessionSettings, bucketName, support.scopeName(), support.collectionName())) .runWith(Sink.seq(), actorSystem); List<CouchbaseWriteResult> writeResults = upsertResults.toCompletableFuture().get(3, TimeUnit.SECONDS); List<CouchbaseWriteFailure> failedDocs = writeResults.stream() .filter(CouchbaseWriteResult::isFailure) .map(CouchbaseWriteFailure.class::cast) .collect(Collectors.toUnmodifiableList());
Replace
The CouchbaseFlow and CouchbaseSink offer factories for replacing documents in Couchbase.
The replace operators fail the stream on any error when writing to Couchbase. To handle failures in-stream use replaceWithResult shown below.
A replace action will fail if the original Document can’t be found in Couchbase with a DocumentDoesNotExistException.
- Scala
-
source
import akka.Done import akka.stream.alpakka.couchbase.CouchbaseDocument import akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow import akka.stream.scaladsl.{Sink, Source} import com.couchbase.client.java.json.JsonObject import scala.concurrent.Future val replaceFuture: Future[Done] = Source .single(obj) .map(doc => new CouchbaseDocument(doc.getId, JsonObject.create().put("value", doc.getDocument))) .via( CouchbaseFlow.replace( sessionSettings, bucketName, scopeName, collectionName ) ) .runWith(Sink.ignore) - Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseWriteResult; import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; CompletionStage<CouchbaseWriteResult> jsonDocumentReplace = Source.single(obj) .via(CouchbaseFlow.replaceWithResult(sessionSettings, bucketName, support.scopeName(), support.collectionName())) .runWith(Sink.head(), actorSystem); CompletionStage<Done> jsonDocumentReplace = Source.single(obj) .via(CouchbaseFlow.replace(sessionSettings, bucketName, support.scopeName(), support.collectionName())) .runWith(Sink.head(), actorSystem);
For single document modifications you may consider using the CouchbaseSession methods directly, they offer a future-basedCompletionStage-based API which in many cases might be simpler than using Akka Streams with just one element (see below)
Couchbase writes may fail temporarily for a particular node. If you want to handle such failures without restarting the whole stream, the replaceWithResult operator captures failures from Couchbase and emits CouchbaseWriteResult sub-classes CouchbaseWriteSuccess and CouchbaseWriteFailure downstream.
- Scala
-
source
import akka.stream.alpakka.couchbase.CouchbaseWriteFailure import akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow import akka.stream.scaladsl.{Sink, Source} val result = Source(sampleSequence) .via( CouchbaseFlow.replaceWithResult( sessionSettings, bucketName, scopeName, collectionName ) ) .runWith(Sink.seq) val failedDocs = result.futureValue.collect { case res: CouchbaseWriteFailure => res } - Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseWriteFailure; import akka.stream.alpakka.couchbase.CouchbaseWriteResult; import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; CompletionStage<List<CouchbaseWriteResult>> replaceResults = Source.from(list) .via(CouchbaseFlow.replaceWithResult(sessionSettings, bucketName, support.scopeName(), support.collectionName())) .runWith(Sink.seq(), actorSystem); List<CouchbaseWriteResult> writeResults = replaceResults.toCompletableFuture().get(3, TimeUnit.SECONDS); List<CouchbaseWriteFailure> failedDocs = writeResults.stream() .filter(CouchbaseWriteResult::isFailure) .map(CouchbaseWriteFailure.class::cast) .collect(Collectors.toUnmodifiableList());
Delete
The CouchbaseFlow and CouchbaseSink offer factories to delete documents in Couchbase by ID.
- Scala
-
source
import akka.Done import akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow import akka.stream.scaladsl.{Sink, Source} val deleteFuture: Future[Done] = Source .single(sampleData.getId) .via( CouchbaseFlow.delete( sessionSettings, bucketName, scopeName, collectionName ) ) .runWith(Sink.ignore) - Java
-
source
import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; CompletionStage<String> result = Source.single(sampleData.getId()) .via(CouchbaseFlow.delete(sessionSettings, bucketName, support.scopeName(), support.collectionName())) .runWith(Sink.head(), actorSystem);
To handle any delete failures such as non-existent documents in-stream, use the the deleteWithResult operator. It captures failures from Couchbase and emits CouchbaseDeleteResults.
- Scala
-
source
import akka.stream.alpakka.couchbase.CouchbaseDeleteResult import akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow import akka.stream.scaladsl.{Sink, Source} import com.couchbase.client.core.msg.kv.DurabilityLevel import com.couchbase.client.java.kv.RemoveOptions val deleteResult: Future[CouchbaseDeleteResult] = Source .single("non-existent") .via( CouchbaseFlow.deleteWithResult( sessionSettings, RemoveOptions .removeOptions() .durability(DurabilityLevel.MAJORITY_AND_PERSIST_TO_ACTIVE) .timeout(java.time.Duration.ofSeconds(1)), bucketName, scopeName, collectionName ) ) .runWith(Sink.head) - Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseDeleteResult; import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; CompletionStage<CouchbaseDeleteResult> result = Source.single("non-existent") .via(CouchbaseFlow.deleteWithResult(sessionSettings, bucketName, support.scopeName(), support.collectionName())) .runWith(Sink.head(), actorSystem); CouchbaseDeleteResult deleteResult = result.toCompletableFuture().get(3, TimeUnit.SECONDS);
Using CouchbaseSession directly
Access via registry
The CouchbaseSesionRegistry is an Akka extension to manage the life-cycle of Couchbase sessions. All underlying instances are closed upon actor system termination.
When accessing more than one Couchbase cluster, the CouchbaseEnvironment should be shared by setting a single instance for the different CouchbaseSessionSettings.
- Scala
-
source
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseSession import akka.stream.alpakka.couchbase.{CouchbaseSessionRegistry, CouchbaseSessionSettings} // Akka extension (singleton per actor system) val registry = CouchbaseSessionRegistry(actorSystem) // If connecting to more than one Couchbase cluster, the environment should be shared val environment: ClusterEnvironment = ClusterEnvironment.create() actorSystem.registerOnTermination { environment.shutdown() } val sessionSettings = CouchbaseSessionSettings(actorSystem) .withEnvironment(environment) val sessionFuture: Future[CouchbaseSession] = registry.sessionFor(sessionSettings, bucketName) - Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseSessionRegistry; import akka.stream.alpakka.couchbase.CouchbaseSessionSettings; import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession; import com.couchbase.client.java.env.ClusterEnvironment; CouchbaseSessionRegistry registry = CouchbaseSessionRegistry.get(actorSystem); // If connecting to more than one Couchbase cluster, the environment should be shared ClusterEnvironment environment = ClusterEnvironment.create(); actorSystem.registerOnTermination(() -> environment.shutdown()); CouchbaseSessionSettings sessionSettings = CouchbaseSessionSettings.create(actorSystem).withEnvironment(environment); CompletionStage<CouchbaseSession> sessionCompletionStage = registry.getSessionFor(sessionSettings, bucketName);
Manage session life-cycle
Use CouchbaseSessionSettings to get an instance of CouchbaseSession. These settings may be specified in application.conf and complemented in code. Furthermore a session requires the bucket name and needs an ExecutionContext as the creation is asynchronous.
- Scala
-
source
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings import akka.stream.alpakka.couchbase.scaladsl.CouchbaseSession implicit val ec: ExecutionContext = actorSystem.dispatcher val sessionSettings = CouchbaseSessionSettings(actorSystem) val sessionFuture: Future[CouchbaseSession] = CouchbaseSession(sessionSettings, bucketName) actorSystem.registerOnTermination(sessionFuture.flatMap(_.close())) val documentFuture = sessionFuture.flatMap { session => val id = "myId" session.collection(scopeName, collectionName).getBytes(id) } - Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseDocument; import akka.stream.alpakka.couchbase.CouchbaseSessionSettings; import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession; Executor executor = Executors.newSingleThreadExecutor(); CouchbaseSessionSettings sessionSettings = CouchbaseSessionSettings.create(actorSystem); CompletionStage<CouchbaseSession> sessionCompletionStage = CouchbaseSession.create(sessionSettings, bucketName, executor); actorSystem.registerOnTermination( () -> sessionCompletionStage.thenAccept(CouchbaseSession::close)); sessionCompletionStage.thenAccept( session -> { String id = "myId"; CompletionStage<CouchbaseDocument<byte[]>> documentCompletionStage = session.collection(support.scopeName(), support.collectionName()).getBytes(id); documentCompletionStage.exceptionally(ex -> { ex.printStackTrace(); return null; }).thenAccept(doc -> { if (doc != null) { System.out.println(doc); } }); });
Manage Cluster life-cycle
For full control a CouchbaseSession may be created from a Couchbase Cluster. See Connection Lifecycle in the Couchbase documentation for details.
- Scala
-
source
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseSession import com.couchbase.client.java.Cluster val cluster: Cluster = Cluster.connect("localhost", ClusterOptions.clusterOptions( "Administrator", "password" )) val session: CouchbaseSession = CouchbaseSession(cluster.async(), "akka").futureValue actorSystem.registerOnTermination { session.close() } val id = "myId" val documentFuture = session.collection(scopeName, collectionName).getBytes(id) - Java
-
source
import akka.stream.alpakka.couchbase.CouchbaseDocument; import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession; import com.couchbase.client.java.Cluster; Cluster cluster = Cluster.connect("localhost", "Administrator", "password"); CouchbaseSession.create(cluster.async(), bucketName).thenAccept(session -> { actorSystem.registerOnTermination( () -> { session.close(); cluster.close(); }); String id = "First"; CompletionStage<CouchbaseDocument<byte[]>> documentCompletionStage = session.collection(support.scopeName(), support.collectionName()).getBytes(id); documentCompletionStage.thenAccept( opt -> { if (opt != null) { System.out.println(opt.getDocument()); } else { System.out.println("Document " + id + " wasn't found"); } });
To learn about the full range of operations on CouchbaseSession, read the CouchbaseSessionCouchbaseSession API documentation.