Lagom: Read-Side support

This page is specifically about Lagom 1.5 support for Couchbase read-sides.
Before reading this, you should familiarize yourself with Lagom’s general read-side support read-side support.

Query the Read-Side Database

Let us first look at how a service implementation can retrieve data from Couchbase.

Scala
import akka.NotUsed
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseSession
import com.lightbend.lagom.scaladsl.api.{Service, ServiceCall}
import com.lightbend.lagom.scaladsl.persistence.ReadSide
import com.lightbend.lagom.scaladsl.persistence.couchbase.CouchbaseReadSide
import docs.home.persistence.CouchbaseReadSideProcessorTwo.HelloEventProcessor
import play.api.libs.json.{Format, Json}

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
class GreetingServiceImpl(couchbaseReadSide: CouchbaseReadSide, readSideRegistry: ReadSide, session: CouchbaseSession)(
    implicit ec: ExecutionContext
) extends GreetingService {
  readSideRegistry.register[HelloEvent](new HelloEventProcessor(couchbaseReadSide))
  override def userGreetings() =
    ServiceCall { request =>
      session.get("users-actual-greetings").map {
        case Some(jsonDoc) =>
          val json = jsonDoc.content()
          json.getNames().asScala.map(name => UserGreeting(name, json.getString(name))).toList
        case None => List.empty[UserGreeting]
      }
    }
}
Java
import akka.NotUsed;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession;
import com.couchbase.client.java.document.json.JsonObject;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.persistence.ReadSide;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.inject.Inject;


public class GreetingServiceImpl implements GreetingService {

  private final CouchbaseSession session;

  @Inject
  public GreetingServiceImpl(CouchbaseSession couchbaseSession) {
    this.session = couchbaseSession;
  }

  @Override
  public ServiceCall<NotUsed, List<UserGreeting>> userGreetings() {
    return request -> session.get("users-actual-greetings")
        .thenApply(docOpt -> {
          if (docOpt.isPresent()) {
            JsonObject content = docOpt.get().content();
            return content.getNames().stream().map(
                name -> new UserGreeting(name, content.getString(name))
            ).collect(Collectors.toList());
          } else {
            return Collections.emptyList();
          }
        });
  }
}

The CouchbaseSession is injected in the constructor. CouchbaseSession provides several methods in different flavors for executing queries. The one used in the above example returns a document. There are also methods for streaming a result set, which can be useful when the result set is big.

All methods in CouchbaseSession are non-blocking and they return a FutureCompletionStage or a Source.

Update the Read-Side

We need to transform the events generated by the Persistent Entities into a Couchbase documents that can be queried as illustrated in the previous section. For that we will implement a ReadSideProcessor with assistance from the CouchbaseReadSide support component. It will consume events produced by persistent entities and update documents in Couchbase that are optimized for queries.

This is how a ReadSideProcessor class looks like before filling in the implementation details:

Scala
import akka.Done
import akka.stream.alpakka.couchbase.scaladsl.CouchbaseSession
import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.lightbend.lagom.scaladsl.persistence.couchbase.CouchbaseReadSide
import com.lightbend.lagom.scaladsl.persistence.{AggregateEventTag, EventStreamElement, ReadSideProcessor}

import scala.concurrent.{ExecutionContext, Future}

class HelloEventProcessor extends ReadSideProcessor[HelloEvent] {

  override def buildHandler(): ReadSideProcessor.ReadSideHandler[HelloEvent] =
    // TODO build read side handler
    ???

  override def aggregateTags: Set[AggregateEventTag[HelloEvent]] =
    // TODO return the tag for the events
    ???
}
Java
import akka.Done;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag;
import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor;
import com.lightbend.lagom.javadsl.persistence.couchbase.CouchbaseReadSide;
import org.pcollections.PSequence;

import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import static akka.Done.done;


public class HelloEventProcessor extends ReadSideProcessor<HelloEvent> {

  private final CouchbaseReadSide readSide;

  @Inject
  public HelloEventProcessor(CouchbaseReadSide readSide) {
    this.readSide = readSide;
  }

  @Override
  public ReadSideHandler<HelloEvent> buildHandler() {
    // TODO build read side handler
    return null;
  }

  @Override
  public PSequence<AggregateEventTag<HelloEvent>> aggregateTags() {
    // TODO return the tag for the events
    return null;
  }
}

You can see that we have injected the Couchbase read-side support, these will be needed later.

You should already have implemented tagging for your events as described in the Read-Side documentation Read-Side documentation, so first we’ll implement the aggregateTags method in our read-side processor stub, like so:

Scala
override def aggregateTags: Set[AggregateEventTag[HelloEvent]] = HelloEvent.Tag.allTags
Java
@Override
public PSequence<AggregateEventTag<HelloEvent>> aggregateTags() {
  return HelloEvent.TAG.allTags();
}

Building the read-side handler

The other method on the ReadSideProcessor is buildHandler. This is responsible for creating the ReadSideHandler that will handle events. It also gives the opportunity to run two callbacks, one is a global prepare callback, the other is a regular prepare callback.

CouchbaseReadSide has a builder method for creating a builder for these handlers, this builder will create a handler that will automatically handle readside offsets for you. It can be created like so:

Scala
val builder = readSide.builder[HelloEvent]("all-greetings")
Java
CouchbaseReadSide.ReadSideHandlerBuilder<HelloEvent> builder =
    readSide.builder("all-greetings");

The argument passed to this method is the ID of the event processor that Lagom will use when it persists offsets to its offset store. The offsets stored in a Couchbase documents, which will be created and updated for you.

Global prepare

The global prepare callback runs at least once across the whole cluster. It is intended for doing things like creating documents and preparing any data that needs to be available before read side processing starts. Read side processors may be sharded across many nodes, and so tasks like creating documents should usually only be done from one node.

The global prepare callback is run from an Akka cluster singleton. It may be run multiple times - every time a new node becomes the new singleton, the callback will be run. Consequently, the task must be idempotent. If it fails, it will be run again using an exponential backoff, and the read side processing of the whole cluster will not start until it has run successfully.

Of course, setting a global prepare callback is completely optional.

Below is an example method that we’ve implemented to create a document:

Scala
val DocId = "users-actual-greetings"

private def createDocument(session: CouchbaseSession): Future[Done] =
  session.get(DocId).flatMap {
    case Some(doc) => Future.successful(Done)
    case None =>
      session
        .upsert(JsonDocument.create(DocId, JsonObject.empty()))
        .map(_ => Done)
  }
Java
final String DOC_ID = "users-actual-greetings";

private CompletionStage<Done> createDocument(CouchbaseSession session) {
  return
      session.get(DOC_ID).thenComposeAsync(doc -> {
        if (doc.isPresent()) {
          return CompletableFuture.completedFuture(Done.getInstance());
        }
        return session.insert(JsonDocument.create(DOC_ID, JsonObject.empty()))
            .thenApply(ignore -> Done.getInstance());
      });
}

It can then be registered as the global prepare callback in the buildHandler method:

Scala
builder.setGlobalPrepare(createDocument _)
Java
builder.setGlobalPrepare(this::createDocument);

Prepare

In addition to the global prepare callback, there is also a prepare callback. This will be executed once per shard, when the read side processor starts up.

Again this callback is optional.

Scala
private def prepare(session: CouchbaseSession, tag: AggregateEventTag[HelloEvent]): Future[Done] =
  // TODO do something when read-side is run for each shard
  Future.successful(Done)
Java
private CompletionStage<Done> prepare(CouchbaseSession session, AggregateEventTag<HelloEvent> tag) {
  //TODO do something when read-side is run for each shard
  return CompletableFuture.completedFuture(Done.getInstance());
}

And then to register them:

Scala
builder.setPrepare(prepare _)
Java
builder.setPrepare(this::prepare);

Registering your read-side processor

Once you’ve created your read-side processor, you need to register it with Lagom. This is done using the ReadSide component:

Scala
class GreetingServiceImpl(couchbaseReadSide: CouchbaseReadSide, readSideRegistry: ReadSide, session: CouchbaseSession)(
    implicit ec: ExecutionContext
) extends GreetingService {
  readSideRegistry.register[HelloEvent](new HelloEventProcessor(couchbaseReadSide))
Java
@Inject
public GreetingServiceImpl(CouchbaseSession couchbaseSession, ReadSide readSide) {
  this.session = couchbaseSession;
  readSide.register(CouchbaseHelloEventProcessor.HelloEventProcessor.class);
}

Event handlers

The event handlers take an event, and execute updates.

Caveat

Couchbase read-side processors have at-least-once semantics and should be idempotent because it isn’t possible in Couchbase to atomically execute the offset update and the other statements.

Here’s an example callback for handling the GreetingMessageChanged event:

Scala
def processGreetingMessageChanged(session: CouchbaseSession,
                                  ese: EventStreamElement[HelloEvent.GreetingChanged]): Future[Done] =
  session
    .get(DocId)
    .flatMap { maybeDoc =>
      val json = maybeDoc match {
        case Some(doc) => doc.content()
        case None => JsonObject.create();
      }
      val evt = ese.event
      json.put(evt.name, evt.message)
      session.upsert(JsonDocument.create(DocId, json))
    }
    .map(_ => Done)
Java
private CompletionStage<Done> processGreetingMessageChanged(CouchbaseSession session, HelloEvent.GreetingMessageChanged evt) {
  return session.get(DOC_ID).thenCompose((maybeJson) -> {
    final JsonObject json;
    if (maybeJson.isPresent()) {
      json = maybeJson.get().content();
    } else {
      json = JsonObject.create();
    }
    return session.upsert(JsonDocument.create(DOC_ID, json.put(evt.name, evt.message)));
  }).thenApply(doc -> done());
}

This can then be registered with the builder using setEventHandler:

Scala
builder.setEventHandler[HelloEvent.GreetingChanged](processGreetingMessageChanged _)
Java
builder.setEventHandler(HelloEvent.GreetingMessageChanged.class, this::processGreetingMessageChanged);

Once you have finished registering all your event handlers, you can invoke the build method and return the built handler:

Scala
builder.build()
Java
return builder.build();

Underlying implementation

The CouchbaseSession is using the Couchbase Java SDK, which can be accessed directly using the CouchbaseSession.underlying() method. However the native API of the client is based on RxJava and does not provide FutureCompletionStage and Akka Stream APIs.

Each ReadSideProcessor instance is executed by an Actor that is managed by Akka Cluster Sharding. The processor consumes a stream of persistent events delivered by the eventsByTag Persistence Query implemented by akka-persistence-couchbase. The tag corresponds to the tag defined by the AggregateEventTag.

In case of sharded tags, the number of sharded tags will determine how many read-side processors will be running across all nodes of your cluster.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.