Apache Geode
Apache Geode is a distributed datagrid (formerly called “Gemfire” which is now Pivotal’s packaging of Geode).
Alpakka Geode provides flows and sinks to put elements into Geode, and a source to retrieve elements from it. It stores key-value-pairs. Keys and values must be serialized with Geode’s support for it.
Project Info: Alpakka Geode | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-geode
1.0.2
|
JDK versions | OpenJDK 8 |
Scala versions | 2.12.7, 2.11.12, 2.13.0-M5 |
JPMS module name | akka.stream.alpakka.geode |
License | |
Readiness level |
Since 0.10, 2017-06-30
|
Home page | https://doc.akka.io/docs/alpakka/current/ |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-geode" % "1.0.2"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-geode_2.12</artifactId> <version>1.0.2</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-geode_2.12', version: '1.0.2' }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version License com.chuusai shapeless_2.12 2.3.3 Apache 2 com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 org.apache.geode geode-core 1.8.0 The Apache Software License, Version 2.0 org.apache.geode geode-cq 1.8.0 The Apache Software License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause - Dependency tree
com.chuusai shapeless_2.12 2.3.3 Apache 2 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.typelevel macro-compat_2.12 1.1.1 Apache 2 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.22 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-protobuf_2.12 2.5.22 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.apache.geode geode-core 1.8.0 The Apache Software License, Version 2.0 antlr antlr 2.7.7 BSD License com.fasterxml.jackson.core jackson-annotations 2.9.6 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-databind 2.9.6 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.9.6 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.9.6 The Apache Software License, Version 2.0 com.github.stephenc.findbugs findbugs-annotations 1.3.9-1 Apache License, Version 2.0 com.healthmarketscience.rmiio rmiio 2.1.2 Apache License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 com.sun.xml.bind jaxb-core 2.2.11 CDDL+GPL License com.sun.xml.bind jaxb-impl 2.2.11 CDDL+GPL License commons-digester commons-digester 2.1 The Apache Software License, Version 2.0 commons-beanutils commons-beanutils 1.9.2 The Apache Software License, Version 2.0 commons-collections commons-collections 3.2.2 Apache License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 commons-io commons-io 2.6 Apache License, Version 2.0 commons-lang commons-lang 2.6 The Apache Software License, Version 2.0 commons-validator commons-validator 1.6 Apache License, Version 2.0 commons-beanutils commons-beanutils 1.9.2 The Apache Software License, Version 2.0 commons-collections commons-collections 3.2.2 Apache License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 commons-collections commons-collections 3.2.2 Apache License, Version 2.0 commons-digester commons-digester 2.1 The Apache Software License, Version 2.0 commons-beanutils commons-beanutils 1.9.2 The Apache Software License, Version 2.0 commons-collections commons-collections 3.2.2 Apache License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 io.github.classgraph classgraph 4.0.6 The MIT License (MIT) it.unimi.dsi fastutil 8.2.1 Apache License, Version 2.0 javax.activation activation 1.1.1 COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0 javax.resource javax.resource-api 1.7 CDDL + GPLv2 with classpath exception javax.transaction javax.transaction-api 1.2 CDDL + GPLv2 with classpath exception javax.xml.bind jaxb-api 2.2.11 CDDL 1.1 net.java.dev.jna jna 4.1.0 LGPL, version 2.1 net.sf.jopt-simple jopt-simple 5.0.4 The MIT License org.apache.geode geode-common 1.8.0 The Apache Software License, Version 2.0 org.apache.geode geode-json 1.8.0 The Apache Software License, Version 2.0 org.apache.logging.log4j log4j-api 2.11.0 Apache License, Version 2.0 org.apache.logging.log4j log4j-core 2.11.0 Apache License, Version 2.0 org.apache.shiro shiro-core 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-cache 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-config-core 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-crypto-cipher 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-crypto-core 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-crypto-hash 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-crypto-core 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-event 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.jgroups jgroups 3.6.14.Final Apache License 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.geode geode-cq 1.8.0 The Apache Software License, Version 2.0 org.apache.geode geode-core 1.8.0 The Apache Software License, Version 2.0 antlr antlr 2.7.7 BSD License com.fasterxml.jackson.core jackson-annotations 2.9.6 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-databind 2.9.6 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.9.6 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.9.6 The Apache Software License, Version 2.0 com.github.stephenc.findbugs findbugs-annotations 1.3.9-1 Apache License, Version 2.0 com.healthmarketscience.rmiio rmiio 2.1.2 Apache License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 com.sun.xml.bind jaxb-core 2.2.11 CDDL+GPL License com.sun.xml.bind jaxb-impl 2.2.11 CDDL+GPL License commons-digester commons-digester 2.1 The Apache Software License, Version 2.0 commons-beanutils commons-beanutils 1.9.2 The Apache Software License, Version 2.0 commons-collections commons-collections 3.2.2 Apache License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 commons-io commons-io 2.6 Apache License, Version 2.0 commons-lang commons-lang 2.6 The Apache Software License, Version 2.0 commons-validator commons-validator 1.6 Apache License, Version 2.0 commons-beanutils commons-beanutils 1.9.2 The Apache Software License, Version 2.0 commons-collections commons-collections 3.2.2 Apache License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 commons-collections commons-collections 3.2.2 Apache License, Version 2.0 commons-digester commons-digester 2.1 The Apache Software License, Version 2.0 commons-beanutils commons-beanutils 1.9.2 The Apache Software License, Version 2.0 commons-collections commons-collections 3.2.2 Apache License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 io.github.classgraph classgraph 4.0.6 The MIT License (MIT) it.unimi.dsi fastutil 8.2.1 Apache License, Version 2.0 javax.activation activation 1.1.1 COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0 javax.resource javax.resource-api 1.7 CDDL + GPLv2 with classpath exception javax.transaction javax.transaction-api 1.2 CDDL + GPLv2 with classpath exception javax.xml.bind jaxb-api 2.2.11 CDDL 1.1 net.java.dev.jna jna 4.1.0 LGPL, version 2.1 net.sf.jopt-simple jopt-simple 5.0.4 The MIT License org.apache.geode geode-common 1.8.0 The Apache Software License, Version 2.0 org.apache.geode geode-json 1.8.0 The Apache Software License, Version 2.0 org.apache.logging.log4j log4j-api 2.11.0 Apache License, Version 2.0 org.apache.logging.log4j log4j-core 2.11.0 Apache License, Version 2.0 org.apache.shiro shiro-core 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-cache 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-config-core 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-crypto-cipher 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-crypto-core 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-crypto-hash 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-crypto-core 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-event 1.4.0 Apache License, Version 2.0 org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.shiro shiro-lang 1.4.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.jgroups jgroups 3.6.14.Final Apache License 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.apache.logging.log4j log4j-api 2.11.0 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause
Setup
Connection
The connection to Geode is handled by a ClientCache. A single ClientCache
per application is enough. ClientCache
also holds a single PDXSerializer
.
The Geode client should be closed after use, it is recommended to close it on actor system termination.
- Scala
-
val geodeSettings = GeodeSettings(hostname, port = 10334) .withConfiguration(c => c.setPoolIdleTimeout(10)) val geode = new Geode(geodeSettings) system.registerOnTermination(geode.close())
- Java
-
GeodeSettings settings = GeodeSettings.create(hostname, 10334).withConfiguration(c -> c.setPoolIdleTimeout(10)); Geode geode = new Geode(settings); system.registerOnTermination(() -> geode.close());
Apache Geode supports continuous queries. Continuous query rely on server events, thus Alpakka Geode needs to listen to those events. This behaviour – as it consumes more resources – is isolated in a Scala trait and/or an specialized Java class.
- Scala
-
val geode = new Geode(geodeSettings) with PoolSubscription system.registerOnTermination(geode.close())
- Java
-
GeodeWithPoolSubscription geode = new GeodeWithPoolSubscription(settings);
Region
Define a region setting to describe how to access region and the key extraction function.
- Scala
-
val personsRegionSettings: RegionSettings[Int, Person] = RegionSettings("persons", (p: Person) => p.id) val animalsRegionSettings: RegionSettings[Int, Animal] = RegionSettings("animals", (a: Animal) => a.id) val complexesRegionSettings: RegionSettings[UUID, Complex] = RegionSettings("complexes", (a: Complex) => a.id)
- Java
-
protected final RegionSettings<Integer, Person> personRegionSettings = RegionSettings.create("persons", Person::getId); protected final RegionSettings<Integer, Animal> animalRegionSettings = RegionSettings.create("animals", Animal::getId);
Serialization
Objects must be serialized to be stored in or retrieved from Geode. Only PDX format is available with Alpakka Geode. PDXEncoder
s support many options as described in Geode PDX Serialization. A PdxSerializer
must be provided to Geode when reading from or writing to a region.
- Scala
-
object PersonPdxSerializer extends AkkaPdxSerializer[Person] { override def clazz: Class[Person] = classOf[Person] override def toData(o: scala.Any, out: PdxWriter): Boolean = if (o.isInstanceOf[Person]) { val p = o.asInstanceOf[Person] out.writeInt("id", p.id) out.writeString("name", p.name) out.writeDate("birthDate", p.birthDate) true } else false override def fromData(clazz: Class[_], in: PdxReader): AnyRef = { val id: Int = in.readInt("id") val name: String = in.readString("name") val birthDate: Date = in.readDate("birthDate") Person(id, name, birthDate) } }
- Java
-
public class PersonPdxSerializer implements AkkaPdxSerializer<Person> { @Override public Class<Person> clazz() { return Person.class; } @Override public boolean toData(Object o, PdxWriter out) { if (o instanceof Person) { Person p = (Person) o; out.writeInt("id", p.getId()); out.writeString("name", p.getName()); out.writeDate("birthDate", p.getBirthDate()); return true; } return false; } @Override public Object fromData(Class<?> clazz, PdxReader in) { int id = in.readInt("id"); String name = in.readString("name"); Date birthDate = in.readDate("birthDate"); return new Person(id, name, birthDate); } }
This Alpakka Geode provides a generic solution for Scala users based on Shapeless which may generate serializers for case classes at compile time.
Java users need to implement custom serializers manually, or use runtime reflection as described in Using Automatic Reflection-Based PDX Serialization.
Writing to Geode
This example stores data in Geode within a flow.
- Scala
-
val flow: Flow[Person, Person, NotUsed] = geode.flow(personsRegionSettings) val fut = source.via(flow).runWith(Sink.ignore)
- Java
-
Flow<Person, Person, NotUsed> flow = geode.flow(personRegionSettings, new PersonPdxSerializer()); CompletionStage<List<Person>> run = source.via(flow).toMat(Sink.seq(), Keep.right()).run(materializer);
This example stores data in Geode by using a sink.
- Scala
-
val animalsRegionSettings: RegionSettings[Int, Animal] = RegionSettings("animals", (a: Animal) => a.id) val sink: Sink[Animal, Future[Done]] = geode.sink(animalsRegionSettings) val fut: Future[Done] = source.runWith(sink)
- Java
-
Sink<Animal, CompletionStage<Done>> sink = geode.sink(animalRegionSettings, new AnimalPdxSerializer()); RunnableGraph<CompletionStage<Done>> runnableGraph = source.toMat(sink, Keep.right());
Reading from Geode
Simple query
Apache Geode supports simple queries.
- Scala
-
val geode = new Geode(geodeSettings) system.registerOnTermination(geode.close()) val source = geode .query[Person](s"select * from /persons order by id") .runWith(Sink.foreach(e => log.debug(s"$e")))
- Java
-
CompletionStage<Done> personsDone = geode .query("select * from /persons", new PersonPdxSerializer()) .runForeach( p -> { LOGGER.debug(p.toString()); }, materializer);
Continuous query
Continuous queries need to be explicitly closed, to connect creating and closing a unique identifier needs to be passed to both continuousQuery
and closeContinuousQuery
.
- Scala
-
val source = geode .continuousQuery[Person]('test, s"select * from /persons") .runWith(Sink.fold(0) { (c, p) => log.debug(s"$p $c") if (c == 19) { geode.closeContinuousQuery('test).foreach { _ => log.debug("test cQuery is closed") } } c + 1 })
- Java
-
CompletionStage<Done> fut = geode .continuousQuery("test", "select * from /persons", new PersonPdxSerializer()) .runForeach( p -> { LOGGER.debug(p.toString()); if (p.getId() == 120) { geode.closeContinuousQuery("test"); } }, materializer);
Geode basic commands
Assuming Apache Geode is installed:
gfsh
From the Geode shell:
start locator --name=locator
configure pdx --read-serialized=true
start server --name=server
create region --name=animals --type=PARTITION_REDUNDANT --redundant-copies=2
create region --name=persons --type=PARTITION_REDUNDANT --redundant-copies=2