Apache Geode

Apache Geode is a distributed datagrid (formerly called “Gemfire” which used to be Pivotal’s packaging of Geode and now is VMware Tanzu).

Alpakka Geode provides flows and sinks to put elements into Geode, and a source to retrieve elements from it. It stores key-value-pairs. Keys and values must be serialized with Geode’s support for it.

Project Info: Alpakka Geode
Artifact
com.lightbend.akka
akka-stream-alpakka-geode
2.0.1
JDK versions
Adopt OpenJDK 8
Adopt OpenJDK 11
Scala versions2.12.10, 2.11.12, 2.13.1
JPMS module nameakka.stream.alpakka.geode
License
Readiness level
Since 0.10, 2017-06-30
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
val AkkaVersion = "2.5.31"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-geode" % "2.0.1",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.5.31</akka.version>
  <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-geode_${scala.binary.version}</artifactId>
  <version>2.0.1</version>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream_${scala.binary.version}</artifactId>
  <version>${akka.version}</version>
</dependency>
Gradle
versions += [
  AkkaVersion: "2.5.31",
  ScalaBinary: "2.12"
]
dependencies {
  compile group: 'com.lightbend.akka', name: "akka-stream-alpakka-geode_${versions.ScalaBinary}", version: '2.0.1',
  compile group: 'com.typesafe.akka', name: "akka-stream_${versions.ScalaBinary}", version: versions.AkkaVersion
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.chuusaishapeless_2.122.3.3
com.fasterxml.jackson.corejackson-core2.10.4
com.fasterxml.jackson.corejackson-databind2.10.4
com.typesafe.akkaakka-stream_2.122.5.31
org.apache.geodegeode-core1.12.0
org.apache.geodegeode-cq1.12.0
org.scala-langscala-library2.12.10
Dependency tree
com.chuusai    shapeless_2.12    2.3.3
    org.typelevel    macro-compat_2.12    1.1.1
com.fasterxml.jackson.core    jackson-core    2.10.4
com.fasterxml.jackson.core    jackson-databind    2.10.4
    com.fasterxml.jackson.core    jackson-annotations    2.10.4
com.typesafe.akka    akka-stream_2.12    2.5.31
    com.typesafe.akka    akka-actor_2.12    2.5.31
        com.typesafe    config    1.3.3
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0
    com.typesafe.akka    akka-protobuf_2.12    2.5.31
    com.typesafe    ssl-config-core_2.12    0.3.8
        com.typesafe    config    1.3.3
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.2
    org.reactivestreams    reactive-streams    1.0.2
org.apache.geode    geode-core    1.12.0
    antlr    antlr    2.7.7
    com.fasterxml.jackson.core    jackson-annotations    2.10.4
    com.healthmarketscience.rmiio    rmiio    2.1.2
        commons-logging    commons-logging    1.2
    com.sun.activation    javax.activation    1.2.0
    com.sun.istack    istack-commons-runtime    3.0.9
    com.sun.xml.bind    jaxb-impl    2.3.2
    commons-io    commons-io    2.6
    commons-validator    commons-validator    1.6
        commons-beanutils    commons-beanutils    1.9.3
            commons-collections    commons-collections    3.2.2
            commons-logging    commons-logging    1.2
        commons-collections    commons-collections    3.2.2
        commons-digester    commons-digester    1.8.1
        commons-logging    commons-logging    1.2
    io.github.classgraph    classgraph    4.8.52
    io.micrometer    micrometer-core    1.2.1
        org.hdrhistogram    HdrHistogram    2.1.11
        org.latencyutils    LatencyUtils    2.0.3
    it.unimi.dsi    fastutil    8.3.0
    javax.resource    javax.resource-api    1.7.1
        javax.transaction    javax.transaction-api    1.3
    javax.xml.bind    jaxb-api    2.3.1
        javax.activation    javax.activation-api    1.2.0
    net.java.dev.jna    jna-platform    5.5.0
        net.java.dev.jna    jna    5.5.0
    net.java.dev.jna    jna    5.5.0
    net.sf.jopt-simple    jopt-simple    5.0.4
    org.apache.commons    commons-lang3    3.9
    org.apache.geode    geode-common    1.12.0
    org.apache.geode    geode-logging    1.12.0
        org.apache.geode    geode-common    1.12.0
        org.apache.logging.log4j    log4j-api    2.12.1
    org.apache.geode    geode-management    1.12.0
        com.fasterxml.jackson.core    jackson-annotations    2.10.4
        org.apache.commons    commons-lang3    3.9
        org.apache.httpcomponents    httpclient    4.5.10
            commons-codec    commons-codec    1.11
            commons-logging    commons-logging    1.2
            org.apache.httpcomponents    httpcore    4.4.12
        org.springframework    spring-web    5.2.1.RELEASE
            org.springframework    spring-beans    5.2.1.RELEASE
                org.springframework    spring-core    5.2.1.RELEASE
                    org.springframework    spring-jcl    5.2.1.RELEASE
            org.springframework    spring-core    5.2.1.RELEASE
                org.springframework    spring-jcl    5.2.1.RELEASE
    org.apache.geode    geode-membership    1.12.0
        com.github.stephenc.findbugs    findbugs-annotations    1.3.9-1
        commons-validator    commons-validator    1.6
            commons-beanutils    commons-beanutils    1.9.3
                commons-collections    commons-collections    3.2.2
                commons-logging    commons-logging    1.2
            commons-collections    commons-collections    3.2.2
            commons-digester    commons-digester    1.8.1
            commons-logging    commons-logging    1.2
        it.unimi.dsi    fastutil    8.3.0
        org.apache.commons    commons-lang3    3.9
        org.apache.geode    geode-common    1.12.0
        org.apache.geode    geode-logging    1.12.0
            org.apache.geode    geode-common    1.12.0
            org.apache.logging.log4j    log4j-api    2.12.1
        org.apache.geode    geode-serialization    1.12.0
            it.unimi.dsi    fastutil    8.3.0
            org.apache.geode    geode-common    1.12.0
            org.apache.logging.log4j    log4j-api    2.12.1
        org.apache.geode    geode-tcp-server    1.12.0
            commons-validator    commons-validator    1.6
                commons-beanutils    commons-beanutils    1.9.3
                    commons-collections    commons-collections    3.2.2
                    commons-logging    commons-logging    1.2
                commons-collections    commons-collections    3.2.2
                commons-digester    commons-digester    1.8.1
                commons-logging    commons-logging    1.2
            org.apache.geode    geode-logging    1.12.0
                org.apache.geode    geode-common    1.12.0
                org.apache.logging.log4j    log4j-api    2.12.1
            org.apache.geode    geode-serialization    1.12.0
                it.unimi.dsi    fastutil    8.3.0
                org.apache.geode    geode-common    1.12.0
                org.apache.logging.log4j    log4j-api    2.12.1
            org.apache.logging.log4j    log4j-api    2.12.1
        org.apache.logging.log4j    log4j-api    2.12.1
        org.jgroups    jgroups    3.6.14.Final
    org.apache.geode    geode-serialization    1.12.0
        it.unimi.dsi    fastutil    8.3.0
        org.apache.geode    geode-common    1.12.0
        org.apache.logging.log4j    log4j-api    2.12.1
    org.apache.geode    geode-tcp-server    1.12.0
        commons-validator    commons-validator    1.6
            commons-beanutils    commons-beanutils    1.9.3
                commons-collections    commons-collections    3.2.2
                commons-logging    commons-logging    1.2
            commons-collections    commons-collections    3.2.2
            commons-digester    commons-digester    1.8.1
            commons-logging    commons-logging    1.2
        org.apache.geode    geode-logging    1.12.0
            org.apache.geode    geode-common    1.12.0
            org.apache.logging.log4j    log4j-api    2.12.1
        org.apache.geode    geode-serialization    1.12.0
            it.unimi.dsi    fastutil    8.3.0
            org.apache.geode    geode-common    1.12.0
            org.apache.logging.log4j    log4j-api    2.12.1
        org.apache.logging.log4j    log4j-api    2.12.1
    org.apache.geode    geode-unsafe    1.12.0
    org.apache.logging.log4j    log4j-api    2.12.1
    org.apache.shiro    shiro-core    1.4.1
        org.apache.shiro    shiro-cache    1.4.1
            org.apache.shiro    shiro-lang    1.4.1
                org.slf4j    slf4j-api    1.7.25
        org.apache.shiro    shiro-config-core    1.4.1
            org.apache.shiro    shiro-lang    1.4.1
                org.slf4j    slf4j-api    1.7.25
        org.apache.shiro    shiro-config-ogdl    1.4.1
            commons-beanutils    commons-beanutils    1.9.3
                commons-collections    commons-collections    3.2.2
                commons-logging    commons-logging    1.2
            org.apache.shiro    shiro-config-core    1.4.1
                org.apache.shiro    shiro-lang    1.4.1
                    org.slf4j    slf4j-api    1.7.25
            org.apache.shiro    shiro-event    1.4.1
                org.apache.shiro    shiro-lang    1.4.1
                    org.slf4j    slf4j-api    1.7.25
            org.apache.shiro    shiro-lang    1.4.1
                org.slf4j    slf4j-api    1.7.25
            org.slf4j    slf4j-api    1.7.25
        org.apache.shiro    shiro-crypto-cipher    1.4.1
            org.apache.shiro    shiro-crypto-core    1.4.1
                org.apache.shiro    shiro-lang    1.4.1
                    org.slf4j    slf4j-api    1.7.25
            org.apache.shiro    shiro-lang    1.4.1
                org.slf4j    slf4j-api    1.7.25
        org.apache.shiro    shiro-crypto-hash    1.4.1
            org.apache.shiro    shiro-crypto-core    1.4.1
                org.apache.shiro    shiro-lang    1.4.1
                    org.slf4j    slf4j-api    1.7.25
            org.apache.shiro    shiro-lang    1.4.1
                org.slf4j    slf4j-api    1.7.25
        org.apache.shiro    shiro-event    1.4.1
            org.apache.shiro    shiro-lang    1.4.1
                org.slf4j    slf4j-api    1.7.25
        org.apache.shiro    shiro-lang    1.4.1
            org.slf4j    slf4j-api    1.7.25
    org.jgroups    jgroups    3.6.14.Final
org.apache.geode    geode-cq    1.12.0
    org.apache.geode    geode-logging    1.12.0
        org.apache.geode    geode-common    1.12.0
        org.apache.logging.log4j    log4j-api    2.12.1
    org.apache.geode    geode-membership    1.12.0
        com.github.stephenc.findbugs    findbugs-annotations    1.3.9-1
        commons-validator    commons-validator    1.6
            commons-beanutils    commons-beanutils    1.9.3
                commons-collections    commons-collections    3.2.2
                commons-logging    commons-logging    1.2
            commons-collections    commons-collections    3.2.2
            commons-digester    commons-digester    1.8.1
            commons-logging    commons-logging    1.2
        it.unimi.dsi    fastutil    8.3.0
        org.apache.commons    commons-lang3    3.9
        org.apache.geode    geode-common    1.12.0
        org.apache.geode    geode-logging    1.12.0
            org.apache.geode    geode-common    1.12.0
            org.apache.logging.log4j    log4j-api    2.12.1
        org.apache.geode    geode-serialization    1.12.0
            it.unimi.dsi    fastutil    8.3.0
            org.apache.geode    geode-common    1.12.0
            org.apache.logging.log4j    log4j-api    2.12.1
        org.apache.geode    geode-tcp-server    1.12.0
            commons-validator    commons-validator    1.6
                commons-beanutils    commons-beanutils    1.9.3
                    commons-collections    commons-collections    3.2.2
                    commons-logging    commons-logging    1.2
                commons-collections    commons-collections    3.2.2
                commons-digester    commons-digester    1.8.1
                commons-logging    commons-logging    1.2
            org.apache.geode    geode-logging    1.12.0
                org.apache.geode    geode-common    1.12.0
                org.apache.logging.log4j    log4j-api    2.12.1
            org.apache.geode    geode-serialization    1.12.0
                it.unimi.dsi    fastutil    8.3.0
                org.apache.geode    geode-common    1.12.0
                org.apache.logging.log4j    log4j-api    2.12.1
            org.apache.logging.log4j    log4j-api    2.12.1
        org.apache.logging.log4j    log4j-api    2.12.1
        org.jgroups    jgroups    3.6.14.Final
    org.apache.geode    geode-serialization    1.12.0
        it.unimi.dsi    fastutil    8.3.0
        org.apache.geode    geode-common    1.12.0
        org.apache.logging.log4j    log4j-api    2.12.1
    org.apache.logging.log4j    log4j-api    2.12.1
org.scala-lang    scala-library    2.12.10

Setup

Connection

The connection to Geode is handled by a ClientCache. A single ClientCache per application is enough. ClientCache also holds a single PDXSerializer.

The Geode client should be closed after use, it is recommended to close it on actor system termination.

Scala
val geodeSettings = GeodeSettings(hostname, port = 10334)
  .withConfiguration(c => c.setPoolIdleTimeout(10))
val geode = new Geode(geodeSettings)
system.registerOnTermination(geode.close())
Java
GeodeSettings settings =
    GeodeSettings.create(hostname, 10334).withConfiguration(c -> c.setPoolIdleTimeout(10));
Geode geode = new Geode(settings);
system.registerOnTermination(() -> geode.close());

Apache Geode supports continuous queries. Continuous query rely on server events, thus Alpakka Geode needs to listen to those events. This behaviour – as it consumes more resources – is isolated in a Scala trait and/or an specialized Java class.

Scala
val geode = new Geode(geodeSettings) with PoolSubscription
system.registerOnTermination(geode.close())
Java
GeodeWithPoolSubscription geode = new GeodeWithPoolSubscription(settings);

Region

Define a region setting to describe how to access region and the key extraction function.

Scala
val personsRegionSettings: RegionSettings[Int, Person] = RegionSettings("persons", (p: Person) => p.id)
val animalsRegionSettings: RegionSettings[Int, Animal] = RegionSettings("animals", (a: Animal) => a.id)
val complexesRegionSettings: RegionSettings[UUID, Complex] = RegionSettings("complexes", (a: Complex) => a.id)
Java
protected final RegionSettings<Integer, Person> personRegionSettings =
    RegionSettings.create("persons", Person::getId);
protected final RegionSettings<Integer, Animal> animalRegionSettings =
    RegionSettings.create("animals", Animal::getId);

Serialization

Objects must be serialized to be stored in or retrieved from Geode. Only PDX format is available with Alpakka Geode. PDXEncoders support many options as described in Geode PDX Serialization. A PdxSerializer must be provided to Geode when reading from or writing to a region.

Scala
object PersonPdxSerializer extends AkkaPdxSerializer[Person] {
  override def clazz: Class[Person] = classOf[Person]

  override def toData(o: scala.Any, out: PdxWriter): Boolean =
    if (o.isInstanceOf[Person]) {
      val p = o.asInstanceOf[Person]
      out.writeInt("id", p.id)
      out.writeString("name", p.name)
      out.writeDate("birthDate", p.birthDate)
      true
    } else
      false

  override def fromData(clazz: Class[_], in: PdxReader): AnyRef = {
    val id: Int = in.readInt("id")
    val name: String = in.readString("name")
    val birthDate: Date = in.readDate("birthDate")
    Person(id, name, birthDate)
  }
}
Java
public class PersonPdxSerializer implements AkkaPdxSerializer<Person> {

  @Override
  public Class<Person> clazz() {
    return Person.class;
  }

  @Override
  public boolean toData(Object o, PdxWriter out) {
    if (o instanceof Person) {
      Person p = (Person) o;
      out.writeInt("id", p.getId());
      out.writeString("name", p.getName());
      out.writeDate("birthDate", p.getBirthDate());
      return true;
    }
    return false;
  }

  @Override
  public Object fromData(Class<?> clazz, PdxReader in) {
    int id = in.readInt("id");
    String name = in.readString("name");
    Date birthDate = in.readDate("birthDate");
    return new Person(id, name, birthDate);
  }
}

This Alpakka Geode provides a generic solution for Scala users based on Shapeless which may generate serializers for case classes at compile time.

Java users need to implement custom serializers manually, or use runtime reflection as described in Using Automatic Reflection-Based PDX Serialization.

Writing to Geode

This example stores data in Geode within a flow.

Scala
val flow: Flow[Person, Person, NotUsed] = geode.flow(personsRegionSettings)

val fut = source.via(flow).runWith(Sink.ignore)
Java
Flow<Person, Person, NotUsed> flow =
    geode.flow(personRegionSettings, new PersonPdxSerializer());

CompletionStage<List<Person>> run =
    source.via(flow).toMat(Sink.seq(), Keep.right()).run(materializer);

This example stores data in Geode by using a sink.

Scala
val animalsRegionSettings: RegionSettings[Int, Animal] =
  RegionSettings("animals", (a: Animal) => a.id)

val sink: Sink[Animal, Future[Done]] =
  geode.sink(animalsRegionSettings)

val fut: Future[Done] = source.runWith(sink)
Java
Sink<Animal, CompletionStage<Done>> sink =
    geode.sink(animalRegionSettings, new AnimalPdxSerializer());

RunnableGraph<CompletionStage<Done>> runnableGraph = source.toMat(sink, Keep.right());

Reading from Geode

Simple query

Apache Geode supports simple queries.

Scala
val geode = new Geode(geodeSettings)
system.registerOnTermination(geode.close())

val source =
  geode
    .query[Person](s"select * from /persons order by id")
    .runWith(Sink.foreach(e => log.debug(s"$e")))
Java
CompletionStage<Done> personsDone =
    geode
        .query("select * from /persons", new PersonPdxSerializer())
        .runForeach(
            p -> {
              LOGGER.debug(p.toString());
            },
            materializer);

Continuous query

Continuous queries need to be explicitly closed, to connect creating and closing a unique identifier needs to be passed to both continuousQuery and closeContinuousQuery.

Scala
val source =
  geode
    .continuousQuery[Person](Symbol("test"), s"select * from /persons")
    .runWith(Sink.fold(0) { (c, p) =>
      log.debug(s"$p $c")
      if (c == 19) {
        geode.closeContinuousQuery(Symbol("test")).foreach { _ =>
          log.debug("test cQuery is closed")
        }

      }
      c + 1
    })
Java
CompletionStage<Done> fut =
    geode
        .continuousQuery("test", "select * from /persons", new PersonPdxSerializer())
        .runForeach(
            p -> {
              LOGGER.debug(p.toString());
              if (p.getId() == 120) {
                geode.closeContinuousQuery("test");
              }
            },
            materializer);

Geode basic commands

Assuming Apache Geode is installed:

gfsh

From the Geode shell:

start locator --name=locator
configure pdx --read-serialized=true
start server --name=server

create region --name=animals --type=PARTITION_REDUNDANT --redundant-copies=2
create region --name=persons --type=PARTITION_REDUNDANT --redundant-copies=2

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.