InfluxDB

The Alpakka InfluxDb connector provides Akka Streams integration for InfluxDB.

For more information about InfluxDB, please visit the InfluxDB Documentation

Project Info: Alpakka InfluxDB
Artifact
com.lightbend.akka
akka-stream-alpakka-influxdb
3.0.4
JDK versions
Adopt OpenJDK 8
Adopt OpenJDK 11
Scala versions2.12.11, 2.13.3
JPMS module nameakka.stream.alpakka.influxdb
License
Readiness level
Since 1.1.0, 2019-07-03
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka
Official Akka Streams client

Influxdata, the makers of InfluxDB now offer an Akka Streams-aware client library in https://github.com/influxdata/influxdb-client-java/tree/master/client-scala

“The reference Scala client that allows query and write for the InfluxDB 2.0 by Akka Streams.”

API may change

Alpakka InfluxDB was added in Alpakka 1.1.0 in July 2019 and is marked as “API may change”. Please try it out and suggest improvements.

Furthermore, the major InfluxDB update to version 2.0 is expected to bring API and dependency changes to Alpakka InfluxDB.

Artifacts

sbt
val AkkaVersion = "2.6.14"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-influxdb" % "3.0.4",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.6.14</akka.version>
  <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-influxdb_${scala.binary.version}</artifactId>
    <version>3.0.4</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.6.14",
  ScalaBinary: "2.12"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-influxdb_${versions.ScalaBinary}:3.0.4"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.typesafe.akkaakka-stream_2.122.6.14
org.influxdbinfluxdb-java2.15
org.scala-langscala-library2.12.11
Dependency tree
com.typesafe.akka    akka-stream_2.12    2.6.14    Apache-2.0
    com.typesafe.akka    akka-actor_2.12    2.6.14    Apache-2.0
        com.typesafe    config    1.4.0    Apache-2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.12    2.6.14    Apache-2.0
    com.typesafe    ssl-config-core_2.12    0.4.2    Apache-2.0
        com.typesafe    config    1.4.0    Apache-2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.2    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    org.reactivestreams    reactive-streams    1.0.3    CC0
    org.scala-lang    scala-library    2.12.11    Apache-2.0
org.influxdb    influxdb-java    2.15    The MIT License (MIT)
    com.squareup.okhttp3    logging-interceptor    3.13.1
        com.squareup.okhttp3    okhttp    3.13.1
            com.squareup.okio    okio    1.17.2
    com.squareup.okhttp3    okhttp    3.13.1
        com.squareup.okio    okio    1.17.2
    com.squareup.retrofit2    converter-moshi    2.5.0
        com.squareup.moshi    moshi    1.5.0
            com.squareup.okio    okio    1.17.2
        com.squareup.retrofit2    retrofit    2.5.0
            com.squareup.okhttp3    okhttp    3.13.1
                com.squareup.okio    okio    1.17.2
    com.squareup.retrofit2    retrofit    2.5.0
        com.squareup.okhttp3    okhttp    3.13.1
            com.squareup.okio    okio    1.17.2
    org.msgpack    msgpack-core    0.8.16    Apache 2
org.scala-lang    scala-library    2.12.11    Apache-2.0

Set up InfluxDB client

Sources, Flows and Sinks provided by this connector need a prepared org.influxdb.InfluxDB to access to InfluxDB.

Scala
sourceinfluxDB = InfluxDBFactory.connect(INFLUXDB_URL, USERNAME, PASSWORD);
influxDB.setDatabase(DatabaseName);
influxDB.query(new Query("CREATE DATABASE " + DatabaseName, DatabaseName));
Java
sourcefinal InfluxDB influxDB = InfluxDBFactory.connect(INFLUXDB_URL, USERNAME, PASSWORD);
influxDB.setDatabase(databaseName);
influxDB.query(new Query("CREATE DATABASE " + databaseName, databaseName));
return influxDB;

InfluxDB as Source and Sink

Now we can stream messages from or to InfluxDB by providing the InfluxDB to the InfluxDbSource InfluxDbSource or the InfluxDbSink. InfluxDbSink.

Scala
source@Measurement(name = "cpu", database = "InfluxDbSpec")
public class InfluxDbSpecCpu extends Cpu {
    public InfluxDbSpecCpu() {
    }

    public InfluxDbSpecCpu(Instant time, String hostname, String region, Double idle, Boolean happydevop, Long uptimeSecs) {
        super(time, hostname, region, idle, happydevop, uptimeSecs);
    }

    public InfluxDbSpecCpu cloneAt(Instant time) {
        return new InfluxDbSpecCpu(time, getHostname(), getRegion(), getIdle(), getHappydevop(), getUptimeSecs());
    }
}
Java
source@Measurement(name = "cpu", database = "InfluxDbTest")
public class InfluxDbCpu extends Cpu {

  public InfluxDbCpu() {}

  public InfluxDbCpu(
      Instant time,
      String hostname,
      String region,
      Double idle,
      Boolean happydevop,
      Long uptimeSecs) {
    super(time, hostname, region, idle, happydevop, uptimeSecs);
  }

  public InfluxDbCpu cloneAt(Instant time) {
    return new InfluxDbCpu(
        time, getHostname(), getRegion(), getIdle(), getHappydevop(), getUptimeSecs());
  }
}

With typed source

Use InfluxDbSource.typed and InfluxDbSink.typed to create source and sink. The data is converted by InfluxDBMapper. The data is converted by InfluxDBMapper.

Scala
sourceval f1 = InfluxDbSource
  .typed(classOf[InfluxDbSpecCpu], InfluxDbReadSettings(), influxDB, query)
  .map { cpu: InfluxDbSpecCpu =>
    {
      val clonedCpu = cpu.cloneAt(cpu.getTime.plusSeconds(60000))
      List(InfluxDbWriteMessage(clonedCpu))
    }
  }
  .runWith(InfluxDbSink.typed(classOf[InfluxDbSpecCpu]))
Java
sourceCompletionStage<Done> completionStage =
    InfluxDbSource.typed(InfluxDbCpu.class, InfluxDbReadSettings.Default(), influxDB, query)
        .map(
            cpu -> {
              InfluxDbCpu clonedCpu = cpu.cloneAt(cpu.getTime().plusSeconds(60000l));
              return InfluxDbWriteMessage.create(clonedCpu, NotUsed.notUsed());
            })
        .groupedWithin(10, Duration.of(50l, ChronoUnit.MILLIS))
        .runWith(InfluxDbSink.typed(InfluxDbCpu.class, influxDB), system);

With QueryResult source

Use InfluxDbSource.create and InfluxDbSink.create to create source and sink.

Scala
sourceval query = new Query("SELECT * FROM cpu", DatabaseName);

val f1 = InfluxDbSource(influxDB, query)
  .map(resultToPoints)
  .runWith(InfluxDbSink.create())
Java
sourceQuery query = new Query("SELECT * FROM cpu", DATABASE_NAME);

CompletionStage<Done> completionStage =
    InfluxDbSource.create(influxDB, query)
        .map(queryResult -> points(queryResult))
        .mapConcat(i -> i)
        .groupedWithin(10, Duration.of(50l, ChronoUnit.MILLIS))
        .runWith(InfluxDbSink.create(influxDB), system);

TODO

Writing to InfluxDB

You can also build flow stages. InfluxDbFlow. InfluxDbFlow. The API is similar to creating Sinks.

Scala
sourceval result = Source(
  List(
    List(validMessage)
  )
).via(InfluxDbFlow.create())
  .runWith(Sink.seq)
  .futureValue
Java
sourceCompletableFuture<List<List<InfluxDbWriteResult<Point, NotUsed>>>> completableFuture =
    Source.single(Collections.singletonList(influxDbWriteMessage))
        .via(InfluxDbFlow.create(influxDB))
        .runWith(Sink.seq(), system)
        .toCompletableFuture();

Passing data through InfluxDbFlow

When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to InfluxDB.

Scala
source// We're going to pretend we got messages from kafka.
// After we've written them to InfluxDB, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
case class KafkaMessage(cpu: InfluxDbFlowCpu, offset: KafkaOffset)

val messagesFromKafka = List(
  KafkaMessage(new InfluxDbFlowCpu(Instant.now().minusSeconds(1000), "local_1", "eu-west-2", 1.4d, true, 123L),
               KafkaOffset(0)),
  KafkaMessage(new InfluxDbFlowCpu(Instant.now().minusSeconds(2000), "local_2", "eu-west-1", 2.5d, false, 125L),
               KafkaOffset(1)),
  KafkaMessage(new InfluxDbFlowCpu(Instant.now().minusSeconds(3000), "local_3", "eu-west-4", 3.1d, false, 251L),
               KafkaOffset(2))
)

var committedOffsets = List[KafkaOffset]()

def commitToKafka(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val f1 = Source(messagesFromKafka)
  .map { kafkaMessage: KafkaMessage =>
    val cpu = kafkaMessage.cpu
    println("hostname: " + cpu.getHostname)

    InfluxDbWriteMessage(cpu).withPassThrough(kafkaMessage.offset)
  }
  .groupedWithin(10, 50.millis)
  .via(
    InfluxDbFlow.typedWithPassThrough(classOf[InfluxDbFlowCpu])
  )
  .map { messages: Seq[InfluxDbWriteResult[InfluxDbFlowCpu, KafkaOffset]] =>
    messages.foreach { message =>
      commitToKafka(message.writeMessage.passThrough)
    }
  }
  .runWith(Sink.ignore)
Java
source// We're going to pretend we got metrics from kafka.
// After we've written them to InfluxDb, we want
// to commit the offset to Kafka

/** Just clean the previous data */
influxDB.query(new Query("DELETE FROM cpu"));

List<Integer> committedOffsets = new ArrayList<>();
List<MessageFromKafka> messageFromKafka =
    Arrays.asList(
        new MessageFromKafka(
            new InfluxDbCpu(
                Instant.now().minusSeconds(1000), "local_1", "eu-west-2", 1.4d, true, 123L),
            new KafkaOffset(0)),
        new MessageFromKafka(
            new InfluxDbCpu(
                Instant.now().minusSeconds(2000), "local_2", "eu-west-1", 2.5d, false, 125L),
            new KafkaOffset(1)),
        new MessageFromKafka(
            new InfluxDbCpu(
                Instant.now().minusSeconds(3000), "local_3", "eu-west-4", 3.1d, false, 251L),
            new KafkaOffset(2)));

Consumer<KafkaOffset> commitToKafka =
    kafkaOffset -> committedOffsets.add(kafkaOffset.getOffset());

Source.from(messageFromKafka)
    .map(
        kafkaMessage -> {
          return InfluxDbWriteMessage.create(
              kafkaMessage.influxDbCpu, kafkaMessage.kafkaOffset);
        })
    .groupedWithin(10, Duration.ofMillis(10))
    .via(InfluxDbFlow.typedWithPassThrough(InfluxDbCpu.class, influxDB))
    .map(
        messages -> {
          messages.stream()
              .forEach(
                  message -> {
                    KafkaOffset kafkaOffset = message.writeMessage().passThrough();
                    commitToKafka.accept(kafkaOffset);
                  });
          return NotUsed.getInstance();
        })
    .runWith(Sink.seq(), system)
    .toCompletableFuture()
    .get(10, TimeUnit.SECONDS);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.