Management of a Projection
Offset management
With the ProjectionManagement
API you can manage the offset of a projection.
This management API is only usable with a running projection within the same system.
To retrieve latest stored offset:
- Scala
-
source
import akka.projection.scaladsl.ProjectionManagement import akka.persistence.query.Offset import akka.projection.ProjectionId val projectionId = ProjectionId("shopping-carts", "carts-1") val currentOffset: Future[Option[Offset]] = ProjectionManagement(system).getOffset[Offset](projectionId)
- Java
The offset can be cleared if the projection should be completely rebuilt, starting over again from the first offset. The operation will automatically restart the projection.
- Scala
-
source
val projectionId = ProjectionId("shopping-carts", "carts-1") val done: Future[Done] = ProjectionManagement(system).clearOffset(projectionId)
- Java
The offset can also be updated, which can be useful if the projection is stuck with errors on a specific offset and should skip that offset and continue with next. The operation will automatically restart the projection.
- Scala
-
source
import akka.persistence.query.Sequence val projectionId = ProjectionId("shopping-carts", "carts-1") val currentOffset: Future[Option[Sequence]] = ProjectionManagement(system).getOffset[Sequence](projectionId) currentOffset.foreach { case Some(s) => ProjectionManagement(system).updateOffset[Sequence](projectionId, Sequence(s.value + 1)) case None => // already removed }
- Java
Pause and resume
With the ProjectionManagement
API you can pause and resume processing of a projection. For example, this can be useful when performing some data migration and projection processing cannot run while the migration is in progress.
- Scala
-
source
import akka.projection.scaladsl.ProjectionManagement import akka.projection.ProjectionId val projectionId = ProjectionId("shopping-carts", "carts-1") val mgmt = ProjectionManagement(system) val done = for { _ <- mgmt.pause(projectionId) _ <- someDataMigration() _ <- mgmt.resume(projectionId) } yield Done
- Java
The paused/resumed state is stored and, and it is read when the Projections are started, for example in case of rebalance or system restart.
To retrieve the paused state:
- Scala
-
source
val projectionId = ProjectionId("shopping-carts", "carts-1") val paused: Future[Boolean] = ProjectionManagement(system).isPaused(projectionId)
- Java
Status tracking
The status of a Projection
can be tracked by implementing a StatusObserver
and enable it with withStatusObserver
before running the Projection
.
The StatusObserver
is called when errors occur and envelopes are retried or the projection failed (restarted). It also has callbacks for processing progress and projection lifecyle.
The intention is that the implementation of the StatusObserver
would maintain a view that can be accessed from an administrative UI to have an overview of current status of the projections.