This is a simulation of fake Event Sourced shopping carts. The details of this implementation is not important for understanding Projections. It’s needed for running the example.
sourcepackage docs.guide
import java.time.Instantimport scala.concurrent.duration._
import scala.util.Randomimport akka.actor.typed.ActorSystemimport akka.actor.typed.Behaviorimport akka.actor.typed.scaladsl.Behaviorsimport akka.cluster.sharding.typed.scaladsl.ClusterShardingimport akka.cluster.sharding.typed.scaladsl.Entityimport akka.cluster.sharding.typed.scaladsl.EntityTypeKeyimport akka.cluster.typed.Clusterimport akka.cluster.typed.Joinimport akka.persistence.typed.PersistenceIdimport akka.persistence.typed.scaladsl.Effectimport akka.persistence.typed.scaladsl.EventSourcedBehaviorimport akka.stream.scaladsl.Sinkimport akka.stream.scaladsl.Sourceimport com.typesafe.config.ConfigFactory/**
* Generate a shopping cart every 1 second and check it out. Each cart will contain a variety of `ItemAdded`,
* `ItemQuantityAdjusted` and `ItemRemoved` events preceding the the cart `Checkout` event.
*/objectEventGeneratorAppextendsApp{importShoppingCartEvents._
val Products=List("cat t-shirt","akka t-shirt","skis","bowling shoes")
val MaxQuantity=5
val MaxItems=3
val MaxItemsAdjusted=3
val EntityKey:EntityTypeKey[Event]=EntityTypeKey[Event]("shopping-cart-event")
val config =ConfigFactory.parseString("akka.actor.provider = cluster").withFallback(ConfigFactory.load("guide-shopping-cart-app.conf"))ActorSystem(Behaviors.setup[String]{ ctx =>implicit val system = ctx.system
val cluster =Cluster(system)
cluster.manager !Join(cluster.selfMember.address)
val sharding =ClusterSharding(system)
val _ = sharding.init(Entity(EntityKey){ entityCtx =>
cartBehavior(entityCtx.entityId, tagFactory(entityCtx.entityId))})Source.tick(1.second,1.second,"checkout").mapConcat {
_ =>
val cartId = java.util.UUID.randomUUID().toString.take(5)
val items = randomItems()
val itemEvents =(0 to items).flatMap { _ =>
val itemId =Products(Random.nextInt(Products.size))// add the item
val quantity = randomQuantity()
val itemAdded =ItemAdded(cartId, itemId, quantity)// make up to `MaxItemAdjusted` adjustments to quantity of item
val adjustments =Random.nextInt(MaxItemsAdjusted)
val itemQuantityAdjusted =(0 to adjustments).foldLeft(Seq[ItemQuantityAdjusted]()){case(events, _)=>
val newQuantity = randomQuantity()
val oldQuantity =if(events.isEmpty) itemAdded.quantity
else events.last.newQuantity
events :+ItemQuantityAdjusted(cartId, itemId, newQuantity, oldQuantity)}// flip a coin to decide whether or not to remove the item
val itemRemoved =if(Random.nextBoolean())List(ItemRemoved(cartId, itemId, itemQuantityAdjusted.last.newQuantity))elseNilList(itemAdded)++ itemQuantityAdjusted ++ itemRemoved
}// checkout the cart and all its preceding item events
itemEvents :+CheckedOut(cartId,Instant.now())}// send each event to the sharded entity represented by the event's cartId.runWith(Sink.foreach(event=> sharding.entityRefFor(EntityKey,event.cartId).ref.tell(event)))Behaviors.empty
},"EventGeneratorApp", config)/**
* Random non-zero based quantity for `ItemAdded` and `ItemQuantityAdjusted` events
*/def randomQuantity():Int=Random.nextInt(MaxQuantity-1)+1/**
* Random non-zero based count for how many `ItemAdded` events to generate
*/def randomItems():Int=Random.nextInt(MaxItems-1)+1/**
* Choose a tag from `ShoppingCartTags` based on the entity id (cart id)
*/def tagFactory(entityId:String):String=if(args.contains("cluster")){
val n = math.abs(entityId.hashCode %ShoppingCartTags.Tags.size)
val selectedTag =ShoppingCartTags.Tags(n)
selectedTag
}elseShoppingCartTags.Single/**
* Construct an Actor that persists shopping cart events for a particular persistence id (cart id) and tag.
* This is not how real Event Sourced actors should be be implemented. Please look at
* https://doc.akka.io/libraries/akka-core/current/typed/persistence.html for more information about `EventSourcedBehavior`.
*/def cartBehavior(persistenceId:String, tag:String):Behavior[Event]=Behaviors.setup { ctx =>EventSourcedBehavior[Event,Event,List[Any]](
persistenceId =PersistenceId.ofUniqueId(persistenceId),Nil,(_,event)=>{
ctx.log.info("id [{}] tag [{}] event: {}", persistenceId, tag,event)Effect.persist(event)},(_, _)=>Nil).withTagger(_ =>Set(tag))}}
sourcepackage jdocs.guide;import akka.actor.typed.ActorSystem;import akka.actor.typed.Behavior;import akka.actor.typed.javadsl.Behaviors;import akka.cluster.sharding.typed.javadsl.ClusterSharding;import akka.cluster.sharding.typed.javadsl.Entity;import akka.cluster.sharding.typed.javadsl.EntityTypeKey;import akka.cluster.typed.Cluster;import akka.cluster.typed.Join;import akka.persistence.typed.PersistenceId;import akka.persistence.typed.javadsl.CommandHandler;import akka.persistence.typed.javadsl.EventHandler;import akka.persistence.typed.javadsl.EventSourcedBehavior;import akka.stream.javadsl.Sink;import akka.stream.javadsl.Source;import com.typesafe.config.Config;import com.typesafe.config.ConfigFactory;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.time.Duration;import java.time.Instant;import java.util.*;import java.util.stream.Collectors;import java.util.stream.IntStream;import java.util.stream.Stream;/**
* Generate a shopping cart every 1 second and check it out. Each cart will contain a variety of
* `ItemAdded`, `ItemQuantityAdjusted` and `ItemRemoved` events preceding the the cart `Checkout`
* event.
*/publicclassEventGeneratorApp{publicstaticvoid main(String[] args)throwsException{Boolean clusterMode =(args.length >0&& args[0].equals("cluster"));Config config = config();ActorSystem<String> system =ActorSystem.create(Guardian.create(clusterMode),"EventGeneratorApp", config);}privatestaticConfig config(){returnConfigFactory.parseString("akka.actor.provider = cluster").withFallback(ConfigFactory.load("guide-shopping-cart-app.conf"));}}classGuardian{staticfinalList<String> PRODUCTS =Arrays.asList("cat t-shirt","akka t-shirt","skis","bowling shoes");staticfinalint MAX_QUANTITY =5;staticfinalint MAX_ITEMS =3;staticfinalint MAX_ITEMS_ADJUSTED =3;staticfinalEntityTypeKey<ShoppingCartEvents.Event> ENTITY_KEY =EntityTypeKey.create(ShoppingCartEvents.Event.class,"shopping-cart-event");staticBehavior<String> create(Boolean clusterMode){returnBehaviors.setup(
context ->{ActorSystem<Void> system = context.getSystem();Cluster cluster =Cluster.get(system);
cluster.manager().tell(newJoin(cluster.selfMember().address()));ClusterSharding sharding =ClusterSharding.get(system);
sharding.init(Entity.of(
ENTITY_KEY,
entityCtx ->{PersistenceId persistenceId =PersistenceId.ofUniqueId(entityCtx.getEntityId());String tag = tagFactory(entityCtx.getEntityId(), clusterMode);returnnewCartPersistentBehavior(persistenceId, tag);}));Source.tick(Duration.ofSeconds(1L),Duration.ofSeconds(1L),"checkout").mapConcat(
checkout ->{String cartId = UUID.randomUUID().toString().substring(0,5);int items = getRandomNumber(1, MAX_ITEMS);Stream<ShoppingCartEvents.ItemEvent> itemEvents =IntStream.range(0, items)// .mapToObj(i -> Integer.valueOf(i)) // Java 8?.boxed().flatMap(
i ->{String itemId =String.valueOf(getRandomNumber(0, PRODUCTS.size()));ArrayList<ShoppingCartEvents.ItemEvent> events =newArrayList<>();// add the itemint quantity = getRandomNumber(1, MAX_QUANTITY);ShoppingCartEvents.ItemAdded itemAdded =newShoppingCartEvents.ItemAdded(cartId, itemId, quantity);// make up to `MaxItemAdjusted` adjustments to quantity// of itemint adjustments = getRandomNumber(0, MAX_ITEMS_ADJUSTED);ArrayList<ShoppingCartEvents.ItemEvent> itemQuantityAdjusted =newArrayList<>();for(int j =0; j < adjustments; j++){int newQuantity = getRandomNumber(1, MAX_QUANTITY);int oldQuantity = itemAdded.quantity;if(!itemQuantityAdjusted.isEmpty()){
oldQuantity =((ShoppingCartEvents.ItemQuantityAdjusted)
itemQuantityAdjusted.get(
itemQuantityAdjusted.size()-1)).newQuantity;}
itemQuantityAdjusted.add(newShoppingCartEvents.ItemQuantityAdjusted(
cartId, itemId, newQuantity, oldQuantity));}// flip a coin to decide whether or not to remove the// itemArrayList<ShoppingCartEvents.ItemEvent> itemRemoved =newArrayList<>();if(Math.random()%2==0){int oldQuantity =((ShoppingCartEvents.ItemQuantityAdjusted)
itemQuantityAdjusted.get(
itemQuantityAdjusted.size()-1)).newQuantity;
itemRemoved.add(newShoppingCartEvents.ItemRemoved(
cartId, itemId, oldQuantity));}
events.add(itemAdded);
events.addAll(itemQuantityAdjusted);
events.addAll(itemRemoved);return events.stream();});// checkout the cart and all its preceding item eventsreturnStream.concat(
itemEvents,Stream.of(newShoppingCartEvents.CheckedOut(cartId,Instant.now()))).collect(Collectors.toList());})// send each event to the sharded entity represented by the event's cartId.runWith(Sink.foreach(event-> sharding.entityRefFor(ENTITY_KEY,event.getCartId()).tell(event)),
system);returnBehaviors.empty();});}staticint getRandomNumber(int min,int max){return(int)((Math.random()*(max - min))+ min);}/** Choose a tag from `ShoppingCartTags` based on the entity id (cart id) */staticString tagFactory(String entityId,Boolean clusterMode){if(clusterMode){int n =Math.abs(entityId.hashCode()%ShoppingCartTags.TAGS.length);String selectedTag =ShoppingCartTags.TAGS[n];return selectedTag;}elsereturnShoppingCartTags.SINGLE;}/**
* An Actor that persists shopping cart events for a particular persistence id (cart id) and tag.
* This is not how real Event Sourced actors should be be implemented. Please look at
* https://doc.akka.io/libraries/akka-core/current/typed/persistence.html for more information about
* `EventSourcedBehavior`.
*/staticclassCartPersistentBehaviorextendsEventSourcedBehavior<ShoppingCartEvents.Event,ShoppingCartEvents.Event,List<Object>>{privatefinalLogger log =LoggerFactory.getLogger(this.getClass());privatefinalString tag;privatefinalSet<String> tags;publicCartPersistentBehavior(PersistenceId persistenceId,String tag){super(persistenceId);this.tag = tag;this.tags =newHashSet<>(Collections.singletonList(tag));}@OverridepublicList<Object> emptyState(){returnnewArrayList<Object>();}@OverridepublicCommandHandler<ShoppingCartEvents.Event,ShoppingCartEvents.Event,List<Object>>
commandHandler(){return(state,event)->{this.log.info("id [{}] tag [{}] event: {}",this.persistenceId().id(),this.tag,event);returnEffect().persist(event);};}@OverridepublicEventHandler<List<Object>,ShoppingCartEvents.Event> eventHandler(){return(state,event)-> state;}@OverridepublicSet<String> tagsFor(ShoppingCartEvents.Eventevent){returnthis.tags;}}}
Found an error in this documentation? The source code for this page can be found here.
Please feel free to edit and contribute a pull request.