Management of a Projection

Offset management

With the ProjectionManagementProjectionManagement API you can manage the offset of a projection.

To retrieve latest stored offset:

Scala
import akka.projection.scaladsl.ProjectionManagement
import akka.persistence.query.Offset
import akka.projection.ProjectionId

val projectionId = ProjectionId("shopping-carts", "carts-1")
val currentOffset: Future[Option[Offset]] = ProjectionManagement(system).getOffset[Offset](projectionId)
Java
import akka.projection.javadsl.ProjectionManagement;


ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1");
CompletionStage<Optional<Offset>> currentOffset =
    ProjectionManagement.get(system).getOffset(projectionId);

The offset can be cleared if the projection should be completely rebuilt, starting over again from the first offset. The operation will automatically restart the projection.

Scala
val projectionId = ProjectionId("shopping-carts", "carts-1")
val done: Future[Done] = ProjectionManagement(system).clearOffset(projectionId)
Java

ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1"); CompletionStage<Done> done = ProjectionManagement.get(system).clearOffset(projectionId);

The offset can also be updated, which can be useful if the projection is stuck with errors on a specific offset and should skip that offset and continue with next. The operation will automatically restart the projection.

Scala
import akka.persistence.query.Sequence
val projectionId = ProjectionId("shopping-carts", "carts-1")
val currentOffset: Future[Option[Sequence]] = ProjectionManagement(system).getOffset[Sequence](projectionId)
currentOffset.foreach {
  case Some(s) => ProjectionManagement(system).updateOffset[Sequence](projectionId, Sequence(s.value + 1))
  case None    => // already removed
}
Java
import akka.persistence.query.Sequence;


ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1");
CompletionStage<Optional<Sequence>> currentOffset =
    ProjectionManagement.get(system).getOffset(projectionId);
currentOffset.thenAccept(
    optionalOffset -> {
      if (optionalOffset.isPresent()) {
        Sequence newOffset = new Sequence(optionalOffset.get().value());
        CompletionStage<Done> done =
            ProjectionManagement.get(system).updateOffset(projectionId, newOffset);
      }
    });

Status tracking

The status of a Projection can be tracked by implementing a StatusObserverStatusObserver and enable it with withStatusObserver before running the Projection.

The StatusObserver is called when errors occur and envelopes are retried or the projection failed (restarted). It also has callbacks for processing progress and projection lifecyle.

The intention is that the implementation of the StatusObserver would maintain a view that can be accessed from an administrative UI to have an overview of current status of the projections.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.