Running the Projection
This example requires a Cassandra database to run. If you do not have a Cassandra database then you can run one locally as a Docker container. To run a Cassandra database locally you can use docker-compose
to run the docker-compose.yaml
found in the Projections project root. The docker-compose.yml
file references the latest Cassandra Docker Image.
Change directory to the directory of the docker-compose.yml
file and manage a Cassandra container with the following commands.
Action | Docker Command |
---|---|
Run | docker-compose --project-name getting-started up -d cassandra |
Stop | docker-compose --project-name getting-started stop |
Delete container state | docker-compose --project-name getting-started rm -f |
CQL shell (when running) | docker run -it --network getting-started_default --rm cassandra cqlsh cassandra |
To use a different Cassandra database update the Cassandra driver’s contact-points configuration found in ./examples/src/resources/guide-shopping-cart-app.conf
.
To run the Projection we must setup our Cassandra database to support the Cassandra Projection offset store as well as the new table we are projecting into with the ItemPopularityProjectionHandler
.
Create a Cassandra keyspace.
CREATE KEYSPACE IF NOT EXISTS akka_projection WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 };
Create the Cassandra Projection offset store table. The DDL can be found in the Cassandra Projection, Schema section.
Create the ItemPopularityProjectionHandler
projection table with the DDL below.
CREATE TABLE IF NOT EXISTS akka_projection.item_popularity (
item_id text,
count counter,
PRIMARY KEY (item_id));
Source events are generated with the EventGeneratorApp
. This app is configured to use Akka Persistence Cassandra and Akka Cluster Sharding to persist random ShoppingCartApp.Events
to a journal. It will checkout a shopping cart with random items and quantities every 1 second. The app will automatically create all the Akka Persistence infrastructure tables in the akka
keyspace. We won’t go into any further detail about how this app functions because it falls outside the scope of Akka Projections. To learn more about the writing events with Akka Persistence see the Akka documentation.
Add the Akka Cluster Sharding library to your project:
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % "2.6.9"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-sharding-typed_${scala.binary.version}</artifactId> <version>2.6.9</version> </dependency>
- Gradle
versions += [ ScalaBinary: "2.13" ] dependencies { compile group: 'com.typesafe.akka', name: "akka-cluster-sharding-typed_${versions.ScalaBinary}", version: '2.6.9' }
Add the EventGeneratorApp
to your project:
- Scala
-
package docs.guide import java.time.Instant import scala.concurrent.duration._ import scala.util.Random import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.Entity import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.typed.Cluster import akka.cluster.typed.Join import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import com.typesafe.config.ConfigFactory /** * Generate a shopping cart every 1 second and check it out. Each cart will contain a variety of `ItemAdded`, * `ItemQuantityAdjusted` and `ItemRemoved` events preceding the the cart `Checkout` event. */ object EventGeneratorApp extends App { import ShoppingCartEvents._ val Products = List("cat t-shirt", "akka t-shirt", "skis", "bowling shoes") val MaxQuantity = 5 val MaxItems = 3 val MaxItemsAdjusted = 3 val EntityKey: EntityTypeKey[Event] = EntityTypeKey[Event]("shopping-cart-event") val config = ConfigFactory .parseString("akka.actor.provider = cluster") .withFallback(ConfigFactory.load("guide-shopping-cart-app.conf")) ActorSystem(Behaviors.setup[String] { ctx => implicit val system = ctx.system val cluster = Cluster(system) cluster.manager ! Join(cluster.selfMember.address) val sharding = ClusterSharding(system) val _ = sharding.init(Entity(EntityKey) { entityCtx => cartBehavior(entityCtx.entityId, tagFactory(entityCtx.entityId)) }) Source .tick(1.second, 1.second, "checkout") .mapConcat { case "checkout" => val cartId = java.util.UUID.randomUUID().toString.take(5) val items = randomItems() val itemEvents = (0 to items).flatMap { _ => val itemId = Products(Random.nextInt(Products.size)) // add the item val quantity = randomQuantity() val itemAdded = ItemAdded(cartId, itemId, quantity) // make up to `MaxItemAdjusted` adjustments to quantity of item val adjustments = Random.nextInt(MaxItemsAdjusted) val itemQuantityAdjusted = (0 to adjustments).foldLeft(Seq[ItemQuantityAdjusted]()) { case (events, _) => val newQuantity = randomQuantity() val oldQuantity = if (events.isEmpty) itemAdded.quantity else events.last.newQuantity events :+ ItemQuantityAdjusted(cartId, itemId, newQuantity, oldQuantity) } // flip a coin to decide whether or not to remove the item val itemRemoved = if (Random.nextBoolean()) List(ItemRemoved(cartId, itemId, itemQuantityAdjusted.last.newQuantity)) else Nil List(itemAdded) ++ itemQuantityAdjusted ++ itemRemoved } // checkout the cart and all its preceding item events itemEvents :+ CheckedOut(cartId, Instant.now()) } // send each event to the sharded entity represented by the event's cartId .runWith(Sink.foreach(event => sharding.entityRefFor(EntityKey, event.cartId).ref.tell(event))) Behaviors.empty }, "EventGeneratorApp", config) /** * Random non-zero based quantity for `ItemAdded` and `ItemQuantityAdjusted` events */ def randomQuantity(): Int = Random.nextInt(MaxQuantity - 1) + 1 /** * Random non-zero based count for how many `ItemAdded` events to generate */ def randomItems(): Int = Random.nextInt(MaxItems - 1) + 1 /** * Choose a tag from `ShoppingCartTags` based on the entity id (cart id) */ def tagFactory(entityId: String): String = if (args.contains("cluster")) { val n = math.abs(entityId.hashCode % ShoppingCartTags.Tags.size) val selectedTag = ShoppingCartTags.Tags(n) selectedTag } else ShoppingCartTags.Single /** * Construct an Actor that persists shopping cart events for a particular persistence id (cart id) and tag. */ def cartBehavior(persistenceId: String, tag: String): Behavior[Event] = Behaviors.setup { ctx => EventSourcedBehavior[Event, Event, List[Any]]( persistenceId = PersistenceId.ofUniqueId(persistenceId), Nil, (_, event) => { ctx.log.info("id [{}] tag [{}] event: {}", persistenceId, tag, event) Effect.persist(event) }, (_, _) => Nil).withTagger(_ => Set(tag)) } }
- Java
-
package jdocs.guide; import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.Behaviors; import akka.cluster.sharding.typed.javadsl.ClusterSharding; import akka.cluster.sharding.typed.javadsl.Entity; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; import akka.cluster.typed.Cluster; import akka.cluster.typed.Join; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.EventSourcedBehavior; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.time.Instant; import java.util.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; /** * Generate a shopping cart every 1 second and check it out. Each cart will contain a variety of * `ItemAdded`, `ItemQuantityAdjusted` and `ItemRemoved` events preceding the the cart `Checkout` * event. */ public class EventGeneratorApp { public static void main(String[] args) throws Exception { Boolean clusterMode = (args.length > 0 && args[0].equals("cluster")); Config config = config(); ActorSystem<String> system = ActorSystem.create(Guardian.create(clusterMode), "EventGeneratorApp", config); } private static Config config() { return ConfigFactory.parseString("akka.actor.provider = cluster") .withFallback(ConfigFactory.load("guide-shopping-cart-app.conf")); } } class Guardian { static final List<String> PRODUCTS = Arrays.asList("cat t-shirt", "akka t-shirt", "skis", "bowling shoes"); static final int MAX_QUANTITY = 5; static final int MAX_ITEMS = 3; static final int MAX_ITEMS_ADJUSTED = 3; static final EntityTypeKey<ShoppingCartEvents.Event> ENTITY_KEY = EntityTypeKey.create(ShoppingCartEvents.Event.class, "shopping-cart-event"); static Behavior<String> create(Boolean clusterMode) { return Behaviors.setup( context -> { ActorSystem<Void> system = context.getSystem(); Cluster cluster = Cluster.get(system); cluster.manager().tell(new Join(cluster.selfMember().address())); ClusterSharding sharding = ClusterSharding.get(system); sharding.init( Entity.of( ENTITY_KEY, entityCtx -> { PersistenceId persistenceId = PersistenceId.ofUniqueId(entityCtx.getEntityId()); String tag = tagFactory(entityCtx.getEntityId(), clusterMode); return new CartPersistentBehavior(persistenceId, tag); })); Source.tick(Duration.ofSeconds(1L), Duration.ofSeconds(1L), "checkout") .mapConcat( checkout -> { String cartId = UUID.randomUUID().toString().substring(0, 5); int items = getRandomNumber(1, MAX_ITEMS); Stream<ShoppingCartEvents.ItemEvent> itemEvents = IntStream.range(0, items) // .mapToObj(i -> Integer.valueOf(i)) // Java 8? .boxed() .flatMap( i -> { String itemId = String.valueOf(getRandomNumber(0, PRODUCTS.size())); ArrayList<ShoppingCartEvents.ItemEvent> events = new ArrayList<>(); // add the item int quantity = getRandomNumber(1, MAX_QUANTITY); ShoppingCartEvents.ItemAdded itemAdded = new ShoppingCartEvents.ItemAdded(cartId, itemId, quantity); // make up to `MaxItemAdjusted` adjustments to quantity // of item int adjustments = getRandomNumber(0, MAX_ITEMS_ADJUSTED); ArrayList<ShoppingCartEvents.ItemEvent> itemQuantityAdjusted = new ArrayList<>(); for (int j = 0; j < adjustments; j++) { int newQuantity = getRandomNumber(1, MAX_QUANTITY); int oldQuantity = itemAdded.quantity; if (!itemQuantityAdjusted.isEmpty()) { oldQuantity = ((ShoppingCartEvents.ItemQuantityAdjusted) itemQuantityAdjusted.get( itemQuantityAdjusted.size() - 1)) .newQuantity; } itemQuantityAdjusted.add( new ShoppingCartEvents.ItemQuantityAdjusted( cartId, itemId, newQuantity, oldQuantity)); } // flip a coin to decide whether or not to remove the // item ArrayList<ShoppingCartEvents.ItemEvent> itemRemoved = new ArrayList<>(); if (Math.random() % 2 == 0) { int oldQuantity = ((ShoppingCartEvents.ItemQuantityAdjusted) itemQuantityAdjusted.get( itemQuantityAdjusted.size() - 1)) .newQuantity; itemRemoved.add( new ShoppingCartEvents.ItemRemoved( cartId, itemId, oldQuantity)); } events.add(itemAdded); events.addAll(itemQuantityAdjusted); events.addAll(itemRemoved); return events.stream(); }); // checkout the cart and all its preceding item events return Stream.concat( itemEvents, Stream.of(new ShoppingCartEvents.CheckedOut(cartId, Instant.now()))) .collect(Collectors.toList()); }) // send each event to the sharded entity represented by the event's cartId .runWith( Sink.foreach( event -> sharding.entityRefFor(ENTITY_KEY, event.getCartId()).tell(event)), system); return Behaviors.empty(); }); } static int getRandomNumber(int min, int max) { return (int) ((Math.random() * (max - min)) + min); } /** Choose a tag from `ShoppingCartTags` based on the entity id (cart id) */ static String tagFactory(String entityId, Boolean clusterMode) { if (clusterMode) { int n = Math.abs(entityId.hashCode() % ShoppingCartTags.TAGS.length); String selectedTag = ShoppingCartTags.TAGS[n]; return selectedTag; } else return ShoppingCartTags.SINGLE; } /** * An Actor that persists shopping cart events for a particular persistence id (cart id) and tag. */ static class CartPersistentBehavior extends EventSourcedBehavior< ShoppingCartEvents.Event, ShoppingCartEvents.Event, List<Object>> { private final Logger log = LoggerFactory.getLogger(this.getClass()); private final String tag; private final Set<String> tags; public CartPersistentBehavior(PersistenceId persistenceId, String tag) { super(persistenceId); this.tag = tag; this.tags = new HashSet<>(Collections.singletonList(tag)); } @Override public List<Object> emptyState() { return new ArrayList<Object>(); } @Override public CommandHandler<ShoppingCartEvents.Event, ShoppingCartEvents.Event, List<Object>> commandHandler() { return (state, event) -> { this.log.info("id [{}] tag [{}] event: {}", this.persistenceId().id(), this.tag, event); return Effect().persist(event); }; } @Override public EventHandler<List<Object>, ShoppingCartEvents.Event> eventHandler() { return (state, event) -> state; } @Override public Set<String> tagsFor(ShoppingCartEvents.Event event) { return this.tags; } } }
Run EventGeneratorApp
:
- sbt
-
sbt "runMain docs.guide.EventGeneratorApp"
- Maven
-
mvn compile exec:java -Dexec.mainClass="jdocs.guide.EventGeneratorApp"
If you don’t see any connection exceptions then you should eventually see log lines produced indicating that events are written to the journal.
Ex)
[2020-08-13 15:20:05,583] [INFO] [docs.guide.EventGeneratorApp$] [] [EventGenerator-akka.actor.default-dispatcher-22] - id [cb52b] tag [shopping-cart] event: ItemQuantityAdjusted(cb52b,skis,1,1) MDC: {persistencePhase=persist-evt, akkaAddress=akka://[email protected]:25520, akkaSource=akka://EventGenerator/system/sharding/shopping-cart-event/678/cb52b, sourceActorSystem=EventGenerator, persistenceId=cb52b}
Finally, we can run ShoppingCartApp
in a new terminal:
- sbt
-
sbt "runMain docs.guide.ShoppingCartApp"
- Maven
-
mvn compile exec:java -Dexec.mainClass="jdocs.guide.ShoppingCartApp"
After a few seconds you should see the ItemPopularityProjectionHandler
logging that displays the current checkouts for the day:
[2020-08-12 12:16:34,216] [INFO] [docs.guide.ItemPopularityProjectionHandler] [] [ShoppingCartApp-akka.actor.default-dispatcher-10] - ItemPopularityProjectionHandler(shopping-cart) item popularity for 'bowling shoes': [58]
Use the CQL shell to observe the full information in the item_popularity
table.
cqlsh> SELECT item_id, count FROM akka_projection.item_popularity;
item_id | count
---------------+-------
akka t-shirt | 37
cat t-shirt | 34
skis | 33
bowling shoes | 65
(4 rows)