New to Akka? Start with the Akka SDK.

Couchbase

Couchbase

Couchbase is an open-source, distributed (shared-nothing architecture) multi-model NoSQL document-oriented database software package that is optimized for interactive applications. These applications may serve many concurrent users by creating, storing, retrieving, aggregating, manipulating and presenting data. In support of these kinds of application needs, Couchbase Server is designed to provide easy-to-scale key-value or JSON document access with low latency and high sustained throughput. It is designed to be clustered from a single machine to very large-scale deployments spanning many machines.

Couchbase provides client protocol compatibility with memcached, but adds disk persistence, data replication, live cluster reconfiguration, rebalancing and multitenancy with data partitioning.

Wikipedia

Alpakka Couchbase allows you to read and write to Couchbase. You can query a bucket from CouchbaseSource using N1QL queries or reading by document ID. Couchbase connector uses Couchbase Java SDK version 3.9.1 behind the scenes.

The Couchbase connector supports all document formats which are supported by the SDK. All those formats use the CouchbaseDocument[T] interface and this is the level of abstraction that this connector is using.

Project Info: Alpakka Couchbase
Artifact
com.lightbend.akka
akka-stream-alpakka-couchbase
10.0.1
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.17
JPMS module nameakka.stream.alpakka.couchbase
License
Readiness level
End-of-Life, it is not recommended to use this project any more.
Since 9.0.0-M1, 2024-10-14
Home pagehttps://doc.akka.io/libraries/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

Note

The Akka dependencies are available from Akka’s secure library repository. To access them you need to use a secure, tokenized URL as specified at https://account.akka.io/token.

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.10.11"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-couchbase" % "10.0.1",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.10.11</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-couchbase_${scala.binary.version}</artifactId>
    <version>10.0.1</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.10.11",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-couchbase_${versions.ScalaBinary}:10.0.1"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.couchbase.clientjava-client3.9.1
com.typesafe.akkaakka-pki_2.132.10.11
com.typesafe.akkaakka-stream_2.132.10.11
io.reactivexrxjava-reactive-streams1.2.1
org.scala-langscala-library2.13.17
Dependency tree
com.couchbase.client    java-client    3.9.1    The Apache Software License, Version 2.0
    com.couchbase.client    core-io    3.9.1    The Apache Software License, Version 2.0
        io.projectreactor    reactor-core    3.6.9    Apache License, Version 2.0
            org.reactivestreams    reactive-streams    1.0.4    MIT-0
        org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
        org.reactivestreams    reactive-streams    1.0.4    MIT-0
        org.slf4j    slf4j-api    2.0.17    MIT
com.typesafe.akka    akka-pki_2.13    2.10.11    BUSL-1.1
    com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.13    2.10.11    BUSL-1.1
        com.typesafe    config    1.4.5    Apache-2.0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    org.scala-lang    scala-library    2.13.17    Apache-2.0
    org.slf4j    slf4j-api    2.0.17    MIT
com.typesafe.akka    akka-stream_2.13    2.10.11    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.10.11    BUSL-1.1
        com.typesafe    config    1.4.5    Apache-2.0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.10.11    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.17    Apache-2.0
io.reactivex    rxjava-reactive-streams    1.2.1    The Apache Software License, Version 2.0
    io.reactivex    rxjava    1.2.2    The Apache Software License, Version 2.0
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
org.scala-lang    scala-library    2.13.17    Apache-2.0

Overview

Alpakka Couchbase offers both Akka Streams APIs and a more direct API to access Couchbase:

Configuration

All operations use the CouchbaseSession internally. A session is configured with CouchbaseSessionSettingsCouchbaseSessionSettings and a Couchbase bucket name. The Akka Stream factory methods create and access the corresponding session instance behind the scenes.

By default the CouchbaseSessionSettings are read from the alpakka.couchbase.session section from the configuration eg. in your application.conf.

Settings
sourcealpakka.couchbase {
  session {
    nodes = ["localhost"]
    username = "Administrator"
    password = "password"
  }
}

Using Akka Discovery

To delegate the configuration of Couchbase nodes to any of Akka Discovery’s lookup mechanisms, specify a service name and lookup timeout in the Couchbase section, and pass in DiscoverySupportDiscoverySupport nodes lookup to enrichAsync and configure Akka Discovery accordingly.

The Akka Discovery dependency has to be added explicitly.

Discovery settings (Config discovery)
sourcealpakka.couchbase {
  session {
    service {
      name = couchbase-service
      lookup-timeout = 1 s
    }
    username = "Administrator"
    password = "password"
  }
}

akka.discovery.method = config
akka.discovery.config.services = {
  couchbase-service = {
    endpoints = [
      { host = "akka.io" }
    ]
  }
}

To enable Akka Discovery on the CouchbaseSessionSettings, use DiscoverySupport.nodes() as enrichment function.

Scala
sourceimport akka.stream.alpakka.couchbase.scaladsl.{CouchbaseSession, DiscoverySupport}
import akka.stream.alpakka.couchbase.{CouchbaseSessionRegistry, CouchbaseSessionSettings}

val registry = CouchbaseSessionRegistry(actorSystem)

val sessionSettings = CouchbaseSessionSettings(actorSystem)
  .withEnrichAsync(DiscoverySupport.nodes())
val sessionFuture: Future[CouchbaseSession] = registry.sessionFor(sessionSettings, support.bucketName)
Java
sourceimport akka.stream.alpakka.couchbase.CouchbaseSessionRegistry;
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession;
import akka.stream.alpakka.couchbase.javadsl.DiscoverySupport;

CouchbaseSessionRegistry registry = CouchbaseSessionRegistry.get(actorSystem);

CouchbaseSessionSettings sessionSettings =
    CouchbaseSessionSettings.create(actorSystem)
        .withEnrichAsyncCs(DiscoverySupport.getNodes(actorSystem));
CompletionStage<CouchbaseSession> session = registry.getSessionFor(sessionSettings, bucketName);

Reading from Couchbase in Akka Streams

Using SQL++ queries

To query Couchbase bucket using the SQL++ queries use CouchbaseSource.fromQuery.

Scala
sourceimport akka.stream.alpakka.couchbase.scaladsl.CouchbaseSource
import akka.stream.scaladsl.Sink
import com.couchbase.client.java.json.JsonValue

import scala.concurrent.Future

val resultAsFuture: Future[Seq[JsonValue]] =
  CouchbaseSource
    .fromQuery(sessionSettings,
               bucketName,
               "SELECT * FROM `" + bucketName + "`.`" + scopeName + "`.`" + collectionName + "` LIMIT 10")
    .runWith(Sink.seq)
Java
sourceimport akka.stream.alpakka.couchbase.javadsl.CouchbaseSource;
import com.couchbase.client.java.json.JsonObject;


CompletionStage<List<JsonObject>> resultCompletionStage =
    CouchbaseSource.fromQuery(
            sessionSettings, bucketName, "SELECT * FROM `" + support.bucketName() + "`.`" + support.scopeName() + "`.`" + support.collectionName() + "` LIMIT 10")
        .runWith(Sink.seq(), actorSystem);

Get by ID

CouchbaseFlow.fromId and CouchbaseFlow.bytesFromId methods allow to read documents specified by the document ID in the Akka Stream.

Scala
sourceimport akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow
import akka.stream.scaladsl.{Sink, Source}
import com.couchbase.client.java.json.JsonObject

val ids = immutable.Seq("First", "Second", "Third", "Fourth")

val futureResult =
  Source(ids)
    .via(
      CouchbaseFlow.fromId[JsonObject](
        sessionSettings,
        bucketName,
        scopeName,
        collectionName
      )
    )
    .runWith(Sink.seq)

val resultsAsFuture =
  Source(sampleSequence.map(_.getId))
    .via(
      CouchbaseFlow.bytesFromId(
        sessionSettings,
        bucketName,
        scopeName,
        collectionName
      )
    )
    .runWith(Sink.seq)
Java
sourceimport akka.stream.alpakka.couchbase.CouchbaseDocument;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

  List<String> ids = Arrays.asList("First", "Second", "Third", "Fourth");
  List<String> idsJson = Arrays.asList("FirstJson", "SecondJson", "ThirdJson", "FourthJson");

  CompletionStage<List<CouchbaseDocument<byte[]>>> result =
          Source.from(ids)
                  .via(CouchbaseFlow.bytesFromId(sessionSettings, queryBucketName, support.scopeName(), support.collectionName()))
                  .runWith(Sink.seq(), actorSystem);

List<String> idsJson = Arrays.asList("FirstJson", "SecondJson", "ThirdJson", "FourthJson");
CompletionStage<List<CouchbaseDocument<JsonValue>>> jsonResult =
          Source.from(idsJson)
                  .via(CouchbaseFlow.fromId(sessionSettings, queryBucketName, support.scopeName(), support.collectionName()))
                  .runWith(Sink.seq(), actorSystem);

Writing to Couchbase in Akka Streams

Access Parallelism

Parallelism in accessing Couchbase can be configureed using CouchbaseSessionSettingsCouchbaseSessionSettings, and by default is set to 1.

Note

The default durability and parallelism values are not recommended for production use.

Operation Options

All mutation operations provided include overloaded methods that accept corresponding operation options (InsertOptions, UpsertOptions, etc…) as a parameter. These options include:

  • Couchbase Replication Factor (default ReplicateTo.NONE)
  • Couchbase Persistence Level for Write Operation (default PersistTo.NONE)
  • Optional mutation expiration value
  • Optional transcoder for serializing document values persisted on the cluster

Read more about durability settings in the Couchbase documentation.

All mutation operations are designed to choose transcoders for stored documents based on the T type parameter of CouchbaseDocument[T] interface:

  • a RawBinaryTranscoder will be selected for Array[Byte] documents
  • a RawStringTranscoder will be selected for String documents
  • a default session transcoder (usually JsonTranscoder) will be chosen otherwise

This behavior can be overridden by providing a specific transcoder in mutation operation options or in session environment settings.

Upsert

The CouchbaseFlow and CouchbaseSink offer factories for upserting documents (insert or update) in Couchbase.

The upsert operators fail the stream on any error when writing to Couchbase. To handle failures in-stream use upsertWithResult shown below.

Scala
sourceimport akka.Done
import akka.stream.alpakka.couchbase.CouchbaseDocument
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow
import akka.stream.scaladsl.{Sink, Source}

import scala.concurrent.Future

val result: Future[Done] =
  Source
    .single(sampleData)
    .map(data => new CouchbaseDocument(data.getId, data.getDocument.getBytes()))
    .via(
      CouchbaseFlow.upsert(
        sessionSettings,
        bucketName,
        scopeName,
        collectionName
      )
    )
    .runWith(Sink.ignore)
result.futureValue
Java
sourceimport akka.stream.alpakka.couchbase.CouchbaseWriteResult;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

CompletionStage<Done> jsonDocumentUpsert =
    Source.single(obj)
        .via(CouchbaseFlow.upsert(sessionSettings, bucketName, support.scopeName(), support.collectionName()))
        .runWith(Sink.head(), actorSystem);
Note

For single document modifications you may consider using the CouchbaseSession methods directly, they offer a future-basedCompletionStage-based API which in many cases might be simpler than using Akka Streams with just one element (see below)

Couchbase writes may fail temporarily for a particular node. If you want to handle such failures without restarting the whole stream, the upsertWithResult operator captures failures from Couchbase and emits CouchbaseWriteResult sub-classes CouchbaseWriteSuccess and CouchbaseWriteFailure downstream.

Scala
sourceimport akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow
import akka.stream.alpakka.couchbase.{CouchbaseDocument, CouchbaseWriteFailure, CouchbaseWriteResult}
import akka.stream.scaladsl.{Sink, Source}

import scala.collection.immutable
import scala.concurrent.Future

val result: Future[immutable.Seq[CouchbaseWriteResult]] =
  Source(sampleSequence)
    .map(doc => new CouchbaseDocument(doc.getId, doc.getDocument.getBytes))
    .via(
      CouchbaseFlow.upsertWithResult(
        sessionSettings,
        bucketName,
        scopeName,
        collectionName
      )
    )
    .runWith(Sink.seq)

val failedDocs: immutable.Seq[CouchbaseWriteFailure] = result.futureValue.collect {
  case res: CouchbaseWriteFailure => res
}

import akka.stream.alpakka.couchbase.{CouchbaseWriteFailure, CouchbaseWriteResult}

val result: Future[immutable.Seq[CouchbaseWriteResult]] = Source(sampleSequence)
  .map(doc => new CouchbaseDocument(doc.getId, doc.getDocument.getBytes))
  .via(
    CouchbaseFlow.upsertWithResult(
      sessionSettings,
      UpsertOptions
        .upsertOptions()
        .transcoder(RawBinaryTranscoder.INSTANCE)
        .durability(DurabilityLevel.MAJORITY_AND_PERSIST_TO_ACTIVE)
        .timeout(java.time.Duration.ofSeconds(1)),
      bucketName,
      scopeName,
      collectionName
    )
  )
  .runWith(Sink.seq)
Java
sourceimport akka.stream.alpakka.couchbase.CouchbaseWriteFailure;
import akka.stream.alpakka.couchbase.CouchbaseWriteResult;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

CompletionStage<List<CouchbaseWriteResult>> upsertResults =
    Source.from(sampleSequence)
        .via(CouchbaseFlow.upsertWithResult(sessionSettings, bucketName, support.scopeName(), support.collectionName()))
        .runWith(Sink.seq(), actorSystem);

List<CouchbaseWriteResult> writeResults =
    upsertResults.toCompletableFuture().get(3, TimeUnit.SECONDS);
List<CouchbaseWriteFailure> failedDocs =
    writeResults.stream()
        .filter(CouchbaseWriteResult::isFailure)
        .map(CouchbaseWriteFailure.class::cast)
        .collect(Collectors.toUnmodifiableList());

Replace

The CouchbaseFlow and CouchbaseSink offer factories for replacing documents in Couchbase.

The replace operators fail the stream on any error when writing to Couchbase. To handle failures in-stream use replaceWithResult shown below.

A replace action will fail if the original Document can’t be found in Couchbase with a DocumentDoesNotExistException.

Scala
sourceimport akka.Done
import akka.stream.alpakka.couchbase.CouchbaseDocument
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow
import akka.stream.scaladsl.{Sink, Source}
import com.couchbase.client.java.json.JsonObject

import scala.concurrent.Future

val replaceFuture: Future[Done] =
  Source
    .single(obj)
    .map(doc => new CouchbaseDocument(doc.getId, JsonObject.create().put("value", doc.getDocument)))
    .via(
      CouchbaseFlow.replace(
        sessionSettings,
        bucketName,
        scopeName,
        collectionName
      )
    )
    .runWith(Sink.ignore)
Java
sourceimport akka.stream.alpakka.couchbase.CouchbaseWriteResult;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

CompletionStage<CouchbaseWriteResult> jsonDocumentReplace =
    Source.single(obj)
        .via(CouchbaseFlow.replaceWithResult(sessionSettings, bucketName, support.scopeName(), support.collectionName()))
        .runWith(Sink.head(), actorSystem);
CompletionStage<Done> jsonDocumentReplace =
    Source.single(obj)
        .via(CouchbaseFlow.replace(sessionSettings, bucketName, support.scopeName(), support.collectionName()))
        .runWith(Sink.head(), actorSystem);
Note

For single document modifications you may consider using the CouchbaseSession methods directly, they offer a future-basedCompletionStage-based API which in many cases might be simpler than using Akka Streams with just one element (see below)

Couchbase writes may fail temporarily for a particular node. If you want to handle such failures without restarting the whole stream, the replaceWithResult operator captures failures from Couchbase and emits CouchbaseWriteResult sub-classes CouchbaseWriteSuccess and CouchbaseWriteFailure downstream.

Scala
sourceimport akka.stream.alpakka.couchbase.CouchbaseWriteFailure
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow
import akka.stream.scaladsl.{Sink, Source}

val result =
  Source(sampleSequence)
    .via(
      CouchbaseFlow.replaceWithResult(
        sessionSettings,
        bucketName,
        scopeName,
        collectionName
      )
    )
    .runWith(Sink.seq)

val failedDocs = result.futureValue.collect {
  case res: CouchbaseWriteFailure => res
}
Java
sourceimport akka.stream.alpakka.couchbase.CouchbaseWriteFailure;
import akka.stream.alpakka.couchbase.CouchbaseWriteResult;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

CompletionStage<List<CouchbaseWriteResult>> replaceResults =
    Source.from(list)
        .via(CouchbaseFlow.replaceWithResult(sessionSettings, bucketName, support.scopeName(), support.collectionName()))
        .runWith(Sink.seq(), actorSystem);

List<CouchbaseWriteResult> writeResults =
    replaceResults.toCompletableFuture().get(3, TimeUnit.SECONDS);
List<CouchbaseWriteFailure> failedDocs =
    writeResults.stream()
        .filter(CouchbaseWriteResult::isFailure)
        .map(CouchbaseWriteFailure.class::cast)
        .collect(Collectors.toUnmodifiableList());

Delete

The CouchbaseFlow and CouchbaseSink offer factories to delete documents in Couchbase by ID.

Scala
sourceimport akka.Done
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow
import akka.stream.scaladsl.{Sink, Source}

val deleteFuture: Future[Done] =
  Source
    .single(sampleData.getId)
    .via(
      CouchbaseFlow.delete(
        sessionSettings,
        bucketName,
        scopeName,
        collectionName
      )
    )
    .runWith(Sink.ignore)
Java
sourceimport akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

CompletionStage<String> result =
    Source.single(sampleData.getId())
        .via(CouchbaseFlow.delete(sessionSettings, bucketName, support.scopeName(), support.collectionName()))
        .runWith(Sink.head(), actorSystem);

To handle any delete failures such as non-existent documents in-stream, use the the deleteWithResult operator. It captures failures from Couchbase and emits CouchbaseDeleteResults.

Scala
sourceimport akka.stream.alpakka.couchbase.CouchbaseDeleteResult
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseFlow
import akka.stream.scaladsl.{Sink, Source}
import com.couchbase.client.core.msg.kv.DurabilityLevel
import com.couchbase.client.java.kv.RemoveOptions

val deleteResult: Future[CouchbaseDeleteResult] =
  Source
    .single("non-existent")
    .via(
      CouchbaseFlow.deleteWithResult(
        sessionSettings,
        RemoveOptions
          .removeOptions()
          .durability(DurabilityLevel.MAJORITY_AND_PERSIST_TO_ACTIVE)
          .timeout(java.time.Duration.ofSeconds(1)),
        bucketName,
        scopeName,
        collectionName
      )
    )
    .runWith(Sink.head)
Java
sourceimport akka.stream.alpakka.couchbase.CouchbaseDeleteResult;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseFlow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

CompletionStage<CouchbaseDeleteResult> result =
    Source.single("non-existent")
        .via(CouchbaseFlow.deleteWithResult(sessionSettings, bucketName, support.scopeName(), support.collectionName()))
        .runWith(Sink.head(), actorSystem);
CouchbaseDeleteResult deleteResult = result.toCompletableFuture().get(3, TimeUnit.SECONDS);

Using CouchbaseSession directly

Access via registry

The CouchbaseSesionRegistry is an Akka extension to manage the life-cycle of Couchbase sessions. All underlying instances are closed upon actor system termination.

When accessing more than one Couchbase cluster, the CouchbaseEnvironment should be shared by setting a single instance for the different CouchbaseSessionSettings.

Scala
sourceimport akka.stream.alpakka.couchbase.scaladsl.CouchbaseSession
import akka.stream.alpakka.couchbase.{CouchbaseSessionRegistry, CouchbaseSessionSettings}

// Akka extension (singleton per actor system)
val registry = CouchbaseSessionRegistry(actorSystem)

// If connecting to more than one Couchbase cluster, the environment should be shared
val environment: ClusterEnvironment = ClusterEnvironment.create()
actorSystem.registerOnTermination {
  environment.shutdown()
}

val sessionSettings = CouchbaseSessionSettings(actorSystem)
  .withEnvironment(environment)
val sessionFuture: Future[CouchbaseSession] = registry.sessionFor(sessionSettings, bucketName)
Java
sourceimport akka.stream.alpakka.couchbase.CouchbaseSessionRegistry;
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession;
import com.couchbase.client.java.env.ClusterEnvironment;


CouchbaseSessionRegistry registry = CouchbaseSessionRegistry.get(actorSystem);

// If connecting to more than one Couchbase cluster, the environment should be shared
ClusterEnvironment environment = ClusterEnvironment.create();
actorSystem.registerOnTermination(() -> environment.shutdown());

CouchbaseSessionSettings sessionSettings =
    CouchbaseSessionSettings.create(actorSystem).withEnvironment(environment);
CompletionStage<CouchbaseSession> sessionCompletionStage =
    registry.getSessionFor(sessionSettings, bucketName);

Manage session life-cycle

Use CouchbaseSessionSettings to get an instance of CouchbaseSession. These settings may be specified in application.conf and complemented in code. Furthermore a session requires the bucket name and needs an ExecutionContext as the creation is asynchronous.

Scala
sourceimport akka.stream.alpakka.couchbase.CouchbaseSessionSettings
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseSession

implicit val ec: ExecutionContext = actorSystem.dispatcher
val sessionSettings = CouchbaseSessionSettings(actorSystem)
val sessionFuture: Future[CouchbaseSession] = CouchbaseSession(sessionSettings, bucketName)
actorSystem.registerOnTermination(sessionFuture.flatMap(_.close()))

val documentFuture = sessionFuture.flatMap { session =>
  val id = "myId"
  session.collection(scopeName, collectionName).getBytes(id)
}
Java
sourceimport akka.stream.alpakka.couchbase.CouchbaseDocument;
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession;


Executor executor = Executors.newSingleThreadExecutor();
CouchbaseSessionSettings sessionSettings = CouchbaseSessionSettings.create(actorSystem);
CompletionStage<CouchbaseSession> sessionCompletionStage =
    CouchbaseSession.create(sessionSettings, bucketName, executor);
actorSystem.registerOnTermination(
    () -> sessionCompletionStage.thenAccept(CouchbaseSession::close));

sessionCompletionStage.thenAccept(
    session -> {
      String id = "myId";
      CompletionStage<CouchbaseDocument<byte[]>> documentCompletionStage = session.collection(support.scopeName(), support.collectionName()).getBytes(id);
      documentCompletionStage.exceptionally(ex -> {
        ex.printStackTrace();
        return null;
      }).thenAccept(doc -> {
        if (doc != null) {
          System.out.println(doc);
        }
      });
    });

Manage Cluster life-cycle

For full control a CouchbaseSession may be created from a Couchbase Cluster. See Connection Lifecycle in the Couchbase documentation for details.

Scala
sourceimport akka.stream.alpakka.couchbase.scaladsl.CouchbaseSession
import com.couchbase.client.java.Cluster

val cluster: Cluster = Cluster.connect("localhost",
                                       ClusterOptions.clusterOptions(
                                         "Administrator",
                                         "password"
                                       ))
val session: CouchbaseSession = CouchbaseSession(cluster.async(), "akka").futureValue
actorSystem.registerOnTermination {
  session.close()
}

val id = "myId"
val documentFuture = session.collection(scopeName, collectionName).getBytes(id)
Java
sourceimport akka.stream.alpakka.couchbase.CouchbaseDocument;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession;
import com.couchbase.client.java.Cluster;

Cluster cluster = Cluster.connect("localhost", "Administrator", "password");
CouchbaseSession.create(cluster.async(), bucketName).thenAccept(session -> {
  actorSystem.registerOnTermination(
          () -> {
            session.close();
            cluster.close();
          });

  String id = "First";
  CompletionStage<CouchbaseDocument<byte[]>> documentCompletionStage = session.collection(support.scopeName(), support.collectionName()).getBytes(id);
  documentCompletionStage.thenAccept(
          opt -> {
            if (opt != null) {
              System.out.println(opt.getDocument());
            } else {
              System.out.println("Document " + id + " wasn't found");
            }
          });

To learn about the full range of operations on CouchbaseSession, read the CouchbaseSessionCouchbaseSession API documentation.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.