sourceimport akka.actor.typed.ActorRefimport akka.actor.typed.Behaviorimport akka.actor.typed.SupervisorStrategyimport akka.actor.typed.scaladsl.ActorContextimport akka.actor.typed.scaladsl.BehaviorsobjectWordCountProcessor{
trait CommandfinalcaseclassHandle(envelope:WordEnvelope, replyTo:ActorRef[Try[Done]])extendsCommandprivatefinalcaseclassInitialState(state:Map[Word,Count])extendsCommandprivatefinalcaseclassSaveCompleted(word:Word, saveResult:Try[Done])extendsCommanddef apply(projectionId:ProjectionId, repository:WordCountRepository):Behavior[Command]=Behaviors.supervise[Command]{Behaviors.setup { context =>newWordCountProcessor(context, projectionId, repository).init()}}.onFailure(SupervisorStrategy.restartWithBackoff(1.second,10.seconds,0.1))}classWordCountProcessor(
context:ActorContext[WordCountProcessor.Command],
projectionId:ProjectionId,
repository:WordCountRepository){importWordCountProcessor._
// loading initial state from dbdef init():Behavior[Command]={Behaviors.withStash(10){ buffer =>
context.pipeToSelf(repository.loadAll(projectionId.id)){caseSuccess(value)=>InitialState(value)caseFailure(exc)=>throw exc
}Behaviors.receiveMessage {caseInitialState(state)=>
context.log.debug("Initial state [{}]", state)
buffer.unstashAll(idle(state))case other =>
context.log.debug("Stashed [{}]", other)
buffer.stash(other)Behaviors.same
}}}// waiting for next envelopeprivatedef idle(state:Map[Word,Count]):Behavior[Command]=Behaviors.receiveMessagePartial {caseHandle(envelope, replyTo)=>
val word = envelope.word
context.pipeToSelf(repository.save(projectionId.id, word, state.getOrElse(word,0)+1)){ saveResult =>SaveCompleted(word, saveResult)}
saving(state, replyTo)// will reply from SaveCompletedcase _:InitialState=>Behaviors.unhandled
}// saving the new count for a word in dbprivatedef saving(state:Map[Word,Count], replyTo:ActorRef[Try[Done]]):Behavior[Command]=Behaviors.receiveMessagePartial {caseSaveCompleted(word, saveResult)=>
replyTo ! saveResult
saveResult match {caseSuccess(_)=> idle(state.updated(word, state.getOrElse(word,0)+1))caseFailure(exc)=>throw exc // restart, reload state from db}}}
The Behavior given to the ActorHandler is spawned automatically by the Projection and each envelope is sent to the actor with ask. The actor is supposed to send a response message to the replyTo when it has completed the processing of the envelope. The TryOptional<Throwable> error indicates if the processing was successful or failed.
The lifecycle of the actor is managed by the Projection. The actor is automatically stopped when the Projection is stopped.
Another implementation that is loading the current count for a word on demand, and thereafter caches it in the in-memory state:
sourceobject WordCountProcessor{
trait CommandfinalcaseclassHandle(envelope:WordEnvelope, replyTo:ActorRef[Try[Done]])extendsCommandprivatefinalcaseclassLoadCompleted(word:Word, loadResult:Try[Count])extendsCommandprivatefinalcaseclassSaveCompleted(word:Word, saveResult:Try[Done])extendsCommanddef apply(projectionId:ProjectionId, repository:WordCountRepository):Behavior[Command]=Behaviors.supervise[Command]{Behaviors.setup[Command]{ context =>newWordCountProcessor(context, projectionId, repository).idle(Map.empty)}}.onFailure(SupervisorStrategy.restartWithBackoff(1.second,10.seconds,0.1))}classWordCountProcessor(
context:ActorContext[WordCountProcessor.Command],
projectionId:ProjectionId,
repository:WordCountRepository){importWordCountProcessor._
// waiting for next envelopeprivatedef idle(state:Map[Word,Count]):Behavior[Command]=Behaviors.receiveMessagePartial {caseHandle(envelope, replyTo)=>
val word = envelope.word
state.get(word) match {caseNone=>
load(word)
loading(state, replyTo)// will continue from LoadCompletedcaseSome(count)=>
save(word, count +1)
saving(state, replyTo)// will reply from SaveCompleted}}privatedef load(word:String):Unit={
context.pipeToSelf(repository.load(projectionId.id, word)){ loadResult =>LoadCompleted(word, loadResult)}}// loading the count for a word from dbprivatedef loading(state:Map[Word,Count], replyTo:ActorRef[Try[Done]]):Behavior[Command]=Behaviors.receiveMessagePartial {caseLoadCompleted(word, loadResult)=>
loadResult match {caseSuccess(count)=>
save(word, count +1)
saving(state, replyTo)// will reply from SaveCompletedcaseFailure(exc)=>
replyTo !Failure(exc)
idle(state)}}privatedef save(word:String, count:Count):Unit={
context.pipeToSelf(repository.save(projectionId.id, word, count)){ saveResult =>SaveCompleted(word, saveResult)}}// saving the new count for a word in dbprivatedef saving(state:Map[Word,Count], replyTo:ActorRef[Try[Done]]):Behavior[Command]=Behaviors.receiveMessagePartial {caseSaveCompleted(word, saveResult)=>
replyTo ! saveResult
saveResult match {caseSuccess(_)=>
idle(state.updated(word, state.getOrElse(word,0)+1))caseFailure(_)=>// remove the word from the state if the save failed, because it could have been a timeout// so that it was actually saved, best to reload
idle(state - word)}}}
sourcepublicclassWordCountProcessorextendsAbstractBehavior<WordCountProcessor.Command>{publicinterfaceCommand{}publicstaticclassHandleimplementsCommand{publicfinalWordEnvelope envelope;publicfinalActorRef<Result> replyTo;publicHandle(WordEnvelope envelope,ActorRef<Result> replyTo){this.envelope = envelope;this.replyTo = replyTo;}}publicstaticclassResult{publicfinalOptional<Throwable> error;publicResult(Optional<Throwable> error){this.error = error;}}privatestaticclassLoadCompletedimplementsCommand{finalString word;finalOptional<Throwable> error;finalActorRef<Result> replyTo;privateLoadCompleted(String word,Optional<Throwable> error,ActorRef<Result> replyTo){this.word = word;this.error = error;this.replyTo = replyTo;}}privatestaticclassSaveCompletedimplementsCommand{finalString word;finalOptional<Throwable> error;finalActorRef<Result> replyTo;privateSaveCompleted(String word,Optional<Throwable> error,ActorRef<Result> replyTo){this.word = word;this.error = error;this.replyTo = replyTo;}}publicstaticBehavior<Command> create(ProjectionId projectionId,WordCountRepository repository){returnBehaviors.supervise(Behaviors.setup((ActorContext<Command> context)->newWordCountProcessor(context, projectionId, repository))).onFailure(SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(1),Duration.ofSeconds(10),0.1));}privatefinalProjectionId projectionId;privatefinalWordCountRepository repository;privatefinalMap<String,Integer> state =newHashMap<>();privateWordCountProcessor(ActorContext<Command> context,ProjectionId projectionId,WordCountRepository repository){super(context);this.projectionId = projectionId;this.repository = repository;}@OverridepublicReceive<Command> createReceive(){return newReceiveBuilder().onMessage(Handle.class,this::onHandle).onMessage(LoadCompleted.class,this::onLoadCompleted).onMessage(SaveCompleted.class,this::onSaveCompleted).build();}privateBehavior<Command> onHandle(Handle command){String word = command.envelope.word;if(state.containsKey(word)){int newCount = state.get(word)+1;
getContext().pipeToSelf(
repository.save(projectionId.id(), word, newCount),(done, exc)->// will reply from SaveCompletednewSaveCompleted(word,Optional.ofNullable(exc), command.replyTo));}else{
getContext().pipeToSelf(
repository.load(projectionId.id(), word),(loadResult, exc)->// will reply from LoadCompletednewLoadCompleted(word,Optional.ofNullable(exc), command.replyTo));}returnthis;}privateBehavior<Command> onLoadCompleted(LoadCompleted completed){if(completed.error.isPresent()){
completed.replyTo.tell(newResult(completed.error));}else{String word = completed.word;int newCount = state.getOrDefault(word,0)+1;
getContext().pipeToSelf(
repository.save(projectionId.id(), word, newCount),(done, exc)->// will reply from SaveCompletednewSaveCompleted(word,Optional.ofNullable(exc), completed.replyTo));}returnthis;}privateBehavior<Command> onSaveCompleted(SaveCompleted completed){
completed.replyTo.tell(newResult(completed.error));if(completed.error.isPresent()){// remove the word from the state if the save failed, because it could have been a timeout// so that it was actually saved, best to reload
state.remove(completed.word);}else{String word = completed.word;int newCount = state.getOrDefault(word,0)+1;
state.put(word, newCount);}returnthis;}}}