Management of a Projection
Offset management
With the ProjectionManagement
ProjectionManagement
API you can manage the offset of a projection.
To retrieve latest stored offset:
- Scala
-
import akka.projection.scaladsl.ProjectionManagement import akka.persistence.query.Offset import akka.projection.ProjectionId val projectionId = ProjectionId("shopping-carts", "carts-1") val currentOffset: Future[Option[Offset]] = ProjectionManagement(system).getOffset[Offset](projectionId)
- Java
-
import akka.projection.javadsl.ProjectionManagement; ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1"); CompletionStage<Optional<Offset>> currentOffset = ProjectionManagement.get(system).getOffset(projectionId);
The offset can be cleared if the projection should be completely rebuilt, starting over again from the first offset. The operation will automatically restart the projection.
- Scala
-
val projectionId = ProjectionId("shopping-carts", "carts-1") val done: Future[Done] = ProjectionManagement(system).clearOffset(projectionId)
- Java
-
ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1"); CompletionStage<Done> done = ProjectionManagement.get(system).clearOffset(projectionId);
The offset can also be updated, which can be useful if the projection is stuck with errors on a specific offset and should skip that offset and continue with next. The operation will automatically restart the projection.
- Scala
-
import akka.persistence.query.Sequence val projectionId = ProjectionId("shopping-carts", "carts-1") val currentOffset: Future[Option[Sequence]] = ProjectionManagement(system).getOffset[Sequence](projectionId) currentOffset.foreach { case Some(s) => ProjectionManagement(system).updateOffset[Sequence](projectionId, Sequence(s.value + 1)) case None => // already removed }
- Java
-
import akka.persistence.query.Sequence; ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1"); CompletionStage<Optional<Sequence>> currentOffset = ProjectionManagement.get(system).getOffset(projectionId); currentOffset.thenAccept( optionalOffset -> { if (optionalOffset.isPresent()) { Sequence newOffset = new Sequence(optionalOffset.get().value()); CompletionStage<Done> done = ProjectionManagement.get(system).updateOffset(projectionId, newOffset); } });
Status tracking
The status of a Projection
can be tracked by implementing a StatusObserver
StatusObserver
and enable it with withStatusObserver
before running the Projection
.
The StatusObserver
is called when errors occur and envelopes are retried or the projection failed (restarted). It also has callbacks for processing progress and projection lifecyle.
The intention is that the implementation of the StatusObserver
would maintain a view that can be accessed from an administrative UI to have an overview of current status of the projections.