Couchbase

Couchbase

Couchbase is an open-source, distributed (shared-nothing architecture) multi-model NoSQL document-oriented database software package that is optimized for interactive applications. These applications may serve many concurrent users by creating, storing, retrieving, aggregating, manipulating and presenting data. In support of these kinds of application needs, Couchbase Server is designed to provide easy-to-scale key-value or JSON document access with low latency and high sustained throughput. It is designed to be clustered from a single machine to very large-scale deployments spanning many machines.

Couchbase provides client protocol compatibility with memcached, but adds disk persistence, data replication, live cluster reconfiguration, rebalancing and multitenancy with data partitioning.

Wikipedia

Alpakka Couchbase allows you to read and write to Couchbase. You can query a bucket from CouchbaseSource using N1QL queries or reading by document ID. Couchbase connector uses Couchbase Java SDK version 2.7.2 behind the scenes.

The Couchbase connector supports all document formats which are supported by the SDK. All those formats use the Document<T>Document[T] interface and this is the level of abstraction that this connector is using.

Project Info: Alpakka Couchbase
Artifact
com.lightbend.akka
akka-stream-alpakka-couchbase
1.1.2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12, 2.13.0
JPMS module nameakka.stream.alpakka.couchbase
License
Readiness level
Since 1.0.0, 2019-04-04
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-couchbase" % "1.1.2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-couchbase_2.12</artifactId>
  <version>1.1.2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-couchbase_2.12', version: '1.1.2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.couchbase.clientjava-client2.7.2The Apache Software License, Version 2.0
com.typesafe.akkaakka-stream_2.122.5.23Apache License, Version 2.0
io.reactivexrxjava-reactive-streams1.2.1The Apache Software License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.couchbase.client    java-client    2.7.2    The Apache Software License, Version 2.0
    com.couchbase.client    core-io    1.7.2    The Apache Software License, Version 2.0
        io.opentracing    opentracing-api    0.31.0    The Apache Software License, Version 2.0
        io.reactivex    rxjava    1.3.8    The Apache Software License, Version 2.0
com.typesafe.akka    akka-stream_2.12    2.5.23    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.23    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.23    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.7    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
io.reactivex    rxjava-reactive-streams    1.2.1    The Apache Software License, Version 2.0
    io.reactivex    rxjava    1.3.8    The Apache Software License, Version 2.0
    org.reactivestreams    reactive-streams    1.0.2    CC0
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Overview

Alpakka Couchbase offers both Akka Streams APIs and a more direct API to access Couchbase:

  • CouchbaseSession (APIAPI) offers a direct API for one-off operations
  • CouchbaseSessionRegistry (API) is an Akka extension to keep track and share CouchbaseSessions within an ActorSystem
  • CouchbaseSource (APIAPI), CouchbaseFlow (APIAPI), and CouchbaseSink (APIAPI) offer factory methods to create Akka Stream operators

Configuration

All operations use the CouchbaseSession internally. A session is configured with CouchbaseSessionSettings (API) and a Couchbase bucket name. The Akka Stream factory methods create and access the corresponding session instance behind the scenes.

By default the CouchbaseSessionSettings are read from the alpakka.couchbase.session section from the configuration eg. in your application.conf.

Settings
alpakka.couchbase {
  session {
    nodes = ["localhost"]
    username = "Administrator"
    password = "password"
  }
}

Using Akka Discovery

To delegate the configuration of Couchbase nodes to any of Akka Discovery’s lookup mechanisms, specify a service name and lookup timeout in the Couchbase section, and pass in DiscoverySupportDiscoverySupport nodes lookup to enrichAsync and configure Akka Discovery accordingly.

The Akka Discovery dependency has to be added explicitly.

Discovery settings (Config discovery)
alpakka.couchbase {
  session {
    service {
      name = couchbase-service
      lookup-timeout = 1 s
    }
    username = "anotherUser"
    password = "differentPassword"
  }
}

akka.discovery.method = config
akka.discovery.config.services = {
  couchbase-service = {
    endpoints = [
      { host = "akka.io" }
    ]
  }
}

To enable Akka Discovery on the CouchbaseSessionSettings, use DiscoverySupport.nodes() as enrichment function.

Scala
import akka.stream.alpakka.couchbase.scaladsl.{CouchbaseSession, DiscoverySupport}
import akka.stream.alpakka.couchbase.{CouchbaseSessionRegistry, CouchbaseSessionSettings}

val registry = CouchbaseSessionRegistry(actorSystem)

val sessionSettings = CouchbaseSessionSettings(actorSystem)
  .withEnrichAsync(DiscoverySupport.nodes())
val sessionFuture: Future[CouchbaseSession] = registry.sessionFor(sessionSettings, bucketName)
Java
import akka.stream.alpakka.couchbase.CouchbaseSessionRegistry;
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings;
import akka.stream.alpakka.couchbase.javadsl.DiscoverySupport;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession;

CouchbaseSessionRegistry registry = CouchbaseSessionRegistry.get(actorSystem);

CouchbaseSessionSettings sessionSettings =
    CouchbaseSessionSettings.create(actorSystem)
        .withEnrichAsyncCs(DiscoverySupport.getNodes(actorSystem));
CompletionStage<CouchbaseSession> session = registry.getSessionFor(sessionSettings, bucketName);

Reading from Couchbase in Akka Streams

Using statements

To query Couchbase using the statement DSL use CouchbaseSource.fromStatement.

Scala
import com.couchbase.client.java.query.Select.select
import com.couchbase.client.java.query.dsl.Expression._

val resultAsFuture: Future[Seq[JsonObject]] =
  CouchbaseSource
    .fromStatement(sessionSettings, select("*").from(i(queryBucketName)).limit(10), bucketName)
    .runWith(Sink.seq)
Java
import static com.couchbase.client.java.query.Select.select;
import static com.couchbase.client.java.query.dsl.Expression.*;

CompletionStage<List<JsonObject>> resultCompletionStage =
    CouchbaseSource.fromStatement(
            sessionSettings, select("*").from(i(queryBucketName)).limit(10), bucketName)
        .runWith(Sink.seq(), materializer);

Using N1QL queries

To query Couchbase using the “N1QL” queries use CouchbaseSource.fromN1qlQuery.

Scala
import com.couchbase.client.java.query.{N1qlParams, N1qlQuery}

val params = N1qlParams.build.adhoc(false)
val query = N1qlQuery.simple(s"select count(*) from $queryBucketName", params)

val resultAsFuture: Future[Seq[JsonObject]] =
  CouchbaseSource
    .fromN1qlQuery(sessionSettings, query, bucketName)
    .runWith(Sink.seq)
Java
import com.couchbase.client.java.query.N1qlParams;
import com.couchbase.client.java.query.N1qlQuery;

N1qlParams params = N1qlParams.build().adhoc(false);
SimpleN1qlQuery query = N1qlQuery.simple("select count(*) from " + queryBucketName, params);

CompletionStage<JsonObject> resultCompletionStage =
    CouchbaseSource.fromN1qlQuery(sessionSettings, query, bucketName)
        .runWith(Sink.head(), materializer);

Get by ID

CouchbaseFlow.fromId methods allow to read documents specified by the document ID in the Akka Stream.

Scala
val ids = immutable.Seq("First", "Second", "Third", "Fourth")

val futureResult: Future[immutable.Seq[JsonDocument]] =
  Source(ids)
    .via(
      CouchbaseFlow.fromId(
        sessionSettings,
        bucketName
      )
    )
    .runWith(Sink.seq)
Java
List<String> ids = Arrays.asList("First", "Second", "Third", "Fourth");

CompletionStage<List<JsonDocument>> result =
    Source.from(ids)
        .via(CouchbaseFlow.fromId(sessionSettings, queryBucketName))
        .runWith(Sink.seq(), materializer);

Writing to Couchbase in Akka Streams

For each mutation operation we need to create CouchbaseWriteSettings instance which consists of the following parameters

  • Parallelism in access to Couchbase (default 1)
  • Couchbase Replication Factor (default ReplicateTo.NONE)
  • Couchbase Persistence Level for Write Operation (default PersistTo.NONE)
  • 2 seconds operation timeout

These default values are not recommended for production use, as they do not persist to disk for any node.

Scala
import akka.stream.alpakka.couchbase.CouchbaseWriteSettings
import com.couchbase.client.java.{PersistTo, ReplicateTo}
val writeSettings = CouchbaseWriteSettings()
  .withParallelism(3)
  .withPersistTo(PersistTo.FOUR)
  .withReplicateTo(ReplicateTo.THREE)
  .withTimeout(5.seconds)
Java
CouchbaseWriteSettings writeSettings =
    CouchbaseWriteSettings.create()
        .withParallelism(3)
        .withPersistTo(PersistTo.FOUR)
        .withReplicateTo(ReplicateTo.THREE)
        .withTimeout(Duration.ofSeconds(5));

Read more about durability settings in the Couchbase documentation.

Upsert

The CouchbaseFlow and CouchbaseSink offer factories for upserting documents (insert or update) in Couchbase. upsert is used for the most commonly used JsonDocument, and upsertDoc has as type parameter to support any variants of Document[T]Document<T> which may be RawJsonDocument, StringDocument or BinaryDocument.

The upsert and upsertDoc operators fail the stream on any error when writing to Couchbase. To handle failures in-stream use upsertDocWithResult shown below.

Scala
val obj = TestObject(id = "First", "First")

val writeSettings = CouchbaseWriteSettings()

val jsonDocumentUpsert: Future[Done] =
  Source
    .single(obj)
    .map(toJsonDocument)
    .via(
      CouchbaseFlow.upsert(
        sessionSettings,
        writeSettings,
        bucketName
      )
    )
    .runWith(Sink.ignore)

val stringDocumentUpsert: Future[Done] =
  Source
    .single(sampleData)
    .map(toStringDocument)
    .via(
      CouchbaseFlow.upsertDoc(
        sessionSettings,
        writeSettings,
        bucketName
      )
    )
    .runWith(Sink.ignore)
Java
CompletionStage<JsonDocument> jsonDocumentUpsert =
    Source.single(obj)
        .map(support::toJsonDocument)
        .via(CouchbaseFlow.upsert(sessionSettings, writeSettings, bucketName))
        .runWith(Sink.head(), materializer);
Note

For single document modifications you may consider using the CouchbaseSession methods directly, they offer a future-basedCompletionStage-based API which in many cases might be simpler than using Akka Streams with just one element (see below)

Couchbase writes may fail temporarily for a particular node. If you want to handle such failures without restarting the whole stream, the upsertDocWithResult operator captures failures from Couchbase and emits CouchbaseWriteResult sub-classes CouchbaseWriteSuccess and CouchbaseWriteFailure downstream.

Scala
import akka.stream.alpakka.couchbase.{CouchbaseWriteFailure, CouchbaseWriteResult}

val result: Future[immutable.Seq[CouchbaseWriteResult[RawJsonDocument]]] =
  Source(sampleSequence)
    .map(toRawJsonDocument)
    .via(
      CouchbaseFlow.upsertDocWithResult(
        sessionSettings,
        writeSettings,
        bucketName
      )
    )
    .runWith(Sink.seq)

val failedDocs: immutable.Seq[CouchbaseWriteFailure[RawJsonDocument]] = result.futureValue.collect {
  case res: CouchbaseWriteFailure[RawJsonDocument] => res
}
Java
import akka.stream.alpakka.couchbase.CouchbaseWriteFailure;
import akka.stream.alpakka.couchbase.CouchbaseWriteResult;
CompletionStage<List<CouchbaseWriteResult<StringDocument>>> upsertResults =
    Source.from(sampleSequence)
        .map(support::toStringDocument)
        .via(CouchbaseFlow.upsertDocWithResult(sessionSettings, writeSettings, bucketName))
        .runWith(Sink.seq(), materializer);

List<CouchbaseWriteResult<StringDocument>> writeResults =
    upsertResults.toCompletableFuture().get(3, TimeUnit.SECONDS);
List<CouchbaseWriteFailure<StringDocument>> failedDocs =
    writeResults.stream()
        .filter(CouchbaseWriteResult::isFailure)
        .map(res -> (CouchbaseWriteFailure<StringDocument>) res)
        .collect(Collectors.toList());

Replace

The CouchbaseFlow and CouchbaseSink offer factories for replacing documents in Couchbase. replace is used for the most commonly used JsonDocument, and replaceDoc has as type parameter to support any variants of Document[T]Document<T> which may be RawJsonDocument, StringDocument or BinaryDocument.

The replace and replaceDoc operators fail the stream on any error when writing to Couchbase. To handle failures in-stream use replaceDocWithResult shown below.

A replace action will fail if the original Document can’t be found in Couchbase with a DocumentDoesNotExistException.

Scala
val replaceFuture: Future[Done] =
  Source
    .single(obj)
    .map(toJsonDocument)
    .via(
      CouchbaseFlow.replace(
        sessionSettings,
        writeSettings,
        bucketName
      )
    )
    .runWith(Sink.ignore)
Java
import com.couchbase.client.java.error.DocumentDoesNotExistException;
CompletionStage<JsonDocument> jsonDocumentReplace =
    Source.single(obj)
        .map(support::toJsonDocument)
        .via(CouchbaseFlow.replace(sessionSettings, writeSettings, bucketName))
        .runWith(Sink.head(), materializer);
CompletionStage<JsonDocument> jsonDocumentReplace =
    Source.single(obj)
        .map(support::toJsonDocument)
        .via(CouchbaseFlow.replace(sessionSettings, writeSettings, bucketName))
        .runWith(Sink.head(), materializer);
Note

For single document modifications you may consider using the CouchbaseSession methods directly, they offer a future-basedCompletionStage-based API which in many cases might be simpler than using Akka Streams with just one element (see below)

Couchbase writes may fail temporarily for a particular node. If you want to handle such failures without restarting the whole stream, the replaceDocWithResult operator captures failures from Couchbase and emits CouchbaseWriteResult sub-classes CouchbaseWriteSuccess and CouchbaseWriteFailure downstream.

Scala
import akka.stream.alpakka.couchbase.{CouchbaseWriteFailure, CouchbaseWriteResult}

val result: Future[immutable.Seq[CouchbaseWriteResult[RawJsonDocument]]] =
  Source(sampleSequence)
    .map(toRawJsonDocument)
    .via(
      CouchbaseFlow.upsertDocWithResult(
        sessionSettings,
        writeSettings,
        bucketName
      )
    )
    .runWith(Sink.seq)

val failedDocs: immutable.Seq[CouchbaseWriteFailure[RawJsonDocument]] = result.futureValue.collect {
  case res: CouchbaseWriteFailure[RawJsonDocument] => res
}
Java
import akka.stream.alpakka.couchbase.CouchbaseWriteFailure;
import akka.stream.alpakka.couchbase.CouchbaseWriteResult;
CompletionStage<List<CouchbaseWriteResult<StringDocument>>> upsertResults =
    Source.from(sampleSequence)
        .map(support::toStringDocument)
        .via(CouchbaseFlow.upsertDocWithResult(sessionSettings, writeSettings, bucketName))
        .runWith(Sink.seq(), materializer);

List<CouchbaseWriteResult<StringDocument>> writeResults =
    upsertResults.toCompletableFuture().get(3, TimeUnit.SECONDS);
List<CouchbaseWriteFailure<StringDocument>> failedDocs =
    writeResults.stream()
        .filter(CouchbaseWriteResult::isFailure)
        .map(res -> (CouchbaseWriteFailure<StringDocument>) res)
        .collect(Collectors.toList());

Delete

The CouchbaseFlow and CouchbaseSink offer factories to delete documents in Couchbase by ID.

Scala
val deleteFuture: Future[Done] =
  Source
    .single(sampleData.id)
    .via(
      CouchbaseFlow.delete(
        sessionSettings,
        writeSettings,
        bucketName
      )
    )
    .runWith(Sink.ignore)
Java
CompletionStage<String> result =
    Source.single(sampleData.id())
        .via(CouchbaseFlow.delete(sessionSettings, writeSettings, bucketName))
        .runWith(Sink.head(), materializer);

To handle any delete failures such as non-existent documents in-stream, use the the deleteWithResult operator. It captures failures from Couchbase and emits CouchbaseDeleteResults.

Scala
val deleteResult: Future[CouchbaseDeleteResult] =
  Source
    .single("non-existent")
    .via(
      CouchbaseFlow.deleteWithResult(
        sessionSettings,
        writeSettings,
        bucketName
      )
    )
    .runWith(Sink.head)
Java
import akka.stream.alpakka.couchbase.CouchbaseDeleteResult;
CompletionStage<CouchbaseDeleteResult> result =
    Source.single("non-existent")
        .via(CouchbaseFlow.deleteWithResult(sessionSettings, writeSettings, bucketName))
        .runWith(Sink.head(), materializer);

Using CouchbaseSession directly

Access via registry

The CouchbaseSesionRegistry is an Akka extension to manage the life-cycle of Couchbase sessions. All underlying instances are closed upon actor system termination.

When accessing more than one Couchbase cluster, the CouchbaseEnvironment should be shared by setting a single instance for the different CouchbaseSessionSettings.

Scala
import akka.stream.alpakka.couchbase.CouchbaseSessionRegistry
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseSession
import com.couchbase.client.java.env.{CouchbaseEnvironment, DefaultCouchbaseEnvironment}

// Akka extension (singleton per actor system)
val registry = CouchbaseSessionRegistry(actorSystem)

// If connecting to more than one Couchbase cluster, the environment should be shared
val environment: CouchbaseEnvironment = DefaultCouchbaseEnvironment.create()
actorSystem.registerOnTermination {
  environment.shutdown()
}

val sessionSettings = CouchbaseSessionSettings(actorSystem)
  .withEnvironment(environment)
val sessionFuture: Future[CouchbaseSession] = registry.sessionFor(sessionSettings, bucketName)
Java
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import akka.stream.alpakka.couchbase.CouchbaseSessionRegistry;
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession;

CouchbaseSessionRegistry registry = CouchbaseSessionRegistry.get(actorSystem);

// If connecting to more than one Couchbase cluster, the environment should be shared
CouchbaseEnvironment environment = DefaultCouchbaseEnvironment.create();
actorSystem.registerOnTermination(() -> environment.shutdown());

CouchbaseSessionSettings sessionSettings =
    CouchbaseSessionSettings.create(actorSystem).withEnvironment(environment);
CompletionStage<CouchbaseSession> sessionCompletionStage =
    registry.getSessionFor(sessionSettings, bucketName);

Manage session life-cycle

Use CouchbaseSessionSettings to get an instance of CouchbaseSession. These settings may be specified in application.conf and complemented in code. Furthermore a session requires the bucket name and needs an ExecutionContext as the creation is asynchronous.

Scala
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseSession

implicit val ec: ExecutionContext = actorSystem.dispatcher
val sessionSettings = CouchbaseSessionSettings(actorSystem)
val sessionFuture: Future[CouchbaseSession] = CouchbaseSession(sessionSettings, bucketName)
actorSystem.registerOnTermination(sessionFuture.flatMap(_.close()))

val documentFuture = sessionFuture.flatMap { session =>
  val id = "myId"
  val documentFuture: Future[Option[JsonDocument]] = session.get(id)
  documentFuture.flatMap {
    case Some(jsonDocument) =>
      Future.successful(jsonDocument)
    case None =>
      Future.failed(new RuntimeException(s"document $id wasn't found"))
  }
}
Java
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession;

Executor executor = Executors.newSingleThreadExecutor();
CouchbaseSessionSettings sessionSettings = CouchbaseSessionSettings.create(actorSystem);
CompletionStage<CouchbaseSession> sessionCompletionStage =
    CouchbaseSession.create(sessionSettings, bucketName, executor);
actorSystem.registerOnTermination(
    () -> sessionCompletionStage.thenAccept(CouchbaseSession::close));

sessionCompletionStage.thenAccept(
    session -> {
      String id = "myId";
      CompletionStage<Optional<JsonDocument>> documentCompletionStage = session.get(id);
      documentCompletionStage.thenAccept(
          opt -> {
            if (opt.isPresent()) {
              System.out.println(opt.get());
            } else {
              System.out.println("Document " + id + " wasn't found");
            }
          });
    });

Manage bucket life-cycle

For full control a CouchbaseSession may be created from a Couchbase Bucket. See Scalability and Concurrency in the Couchbase documentation for details.

Scala
import com.couchbase.client.java.auth.PasswordAuthenticator
import com.couchbase.client.java.{Bucket, CouchbaseCluster}

val cluster: CouchbaseCluster = CouchbaseCluster.create("localhost")
cluster.authenticate(new PasswordAuthenticator("Administrator", "password"))
val bucket: Bucket = cluster.openBucket("akka")
val session: CouchbaseSession = CouchbaseSession(bucket)
actorSystem.registerOnTermination {
  session.close()
}

val id = "myId"
val documentFuture = session.get(id).flatMap {
  case Some(jsonDocument) =>
    Future.successful(jsonDocument)
  case None =>
    Future.failed(new RuntimeException(s"document $id wasn't found"))
}
Java
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.auth.PasswordAuthenticator;

CouchbaseCluster cluster = CouchbaseCluster.create("localhost");
cluster.authenticate(new PasswordAuthenticator("Administrator", "password"));
Bucket bucket = cluster.openBucket("akka");
CouchbaseSession session = CouchbaseSession.create(bucket);
actorSystem.registerOnTermination(
    () -> {
      session.close();
      bucket.close();
    });

String id = "First";
CompletionStage<Optional<JsonDocument>> documentCompletionStage = session.get(id);
documentCompletionStage.thenAccept(
    opt -> {
      if (opt.isPresent()) {
        System.out.println(opt.get());
      } else {
        System.out.println("Document " + id + " wasn't found");
      }
    });

To learn about the full range of operations on CouchbaseSession, read the API docsAPI docs.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.