InfluxDB
The Alpakka InfluxDb connector provides Akka Streams integration for InfluxDB.
For more information about InfluxDB, please visit the InfluxDB Documentation
| Project Info: Alpakka InfluxDB | |
|---|---|
| Artifact | com.lightbend.akka akka-stream-alpakka-influxdb 9.0.2 | 
| JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 | 
| Scala versions | 2.13.15, 3.3.4 | 
| JPMS module name | akka.stream.alpakka.influxdb | 
| License | |
| Readiness level | Since 1.1.0, 2019-07-03 | 
| Home page | https://doc.akka.io/libraries/alpakka/current | 
| API documentation | |
| Forums | |
| Release notes | GitHub releases | 
| Issues | Github issues | 
| Sources | https://github.com/akka/alpakka | 
Influxdata, the makers of InfluxDB now offer an Akka Streams-aware client library in https://github.com/influxdata/influxdb-client-java/tree/master/client-scala
“The reference Scala client that allows query and write for the InfluxDB 2.0 by Akka Streams.”
Alpakka InfluxDB was added in Alpakka 1.1.0 in July 2019 and is marked as “API may change”. Please try it out and suggest improvements.
Furthermore, the major InfluxDB update to version 2.0 is expected to bring API and dependency changes to Alpakka InfluxDB.
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
- resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
- <project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
- repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
Additionally, add the dependencies as below.
- sbt
- val AkkaVersion = "2.10.5" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-influxdb" % "9.0.2", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
- <properties> <akka.version>2.10.5</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-influxdb_${scala.binary.version}</artifactId> <version>9.0.2</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>
- Gradle
- def versions = [ AkkaVersion: "2.10.5", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-influxdb_${versions.ScalaBinary}:9.0.2" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
- Organization - Artifact - Version - com.typesafe.akka - akka-stream_2.13 - 2.10.5 - org.influxdb - influxdb-java - 2.15 - org.scala-lang - scala-library - 2.13.15 
- Dependency tree
- com.typesafe.akka akka-stream_2.13 2.10.5 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.5 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.5 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.influxdb influxdb-java 2.15 The MIT License (MIT) com.squareup.okhttp3 logging-interceptor 3.13.1 com.squareup.okhttp3 okhttp 3.13.1 com.squareup.okio okio 1.17.2 com.squareup.okhttp3 okhttp 3.13.1 com.squareup.okio okio 1.17.2 com.squareup.retrofit2 converter-moshi 2.5.0 com.squareup.moshi moshi 1.5.0 com.squareup.okio okio 1.17.2 com.squareup.retrofit2 retrofit 2.5.0 com.squareup.okhttp3 okhttp 3.13.1 com.squareup.okio okio 1.17.2 com.squareup.retrofit2 retrofit 2.5.0 com.squareup.okhttp3 okhttp 3.13.1 com.squareup.okio okio 1.17.2 org.msgpack msgpack-core 0.8.16 Apache 2 org.scala-lang scala-library 2.13.15 Apache-2.0 
Set up InfluxDB client
Sources, Flows and Sinks provided by this connector need a prepared org.influxdb.InfluxDB to access to InfluxDB.
- Scala
- 
  source influxDB = InfluxDBFactory.connect(INFLUXDB_URL, USERNAME, PASSWORD); influxDB.setDatabase(DatabaseName); influxDB.query(new Query("CREATE DATABASE " + DatabaseName, DatabaseName));
- Java
- 
  source final InfluxDB influxDB = InfluxDBFactory.connect(INFLUXDB_URL, USERNAME, PASSWORD); influxDB.setDatabase(databaseName); influxDB.query(new Query("CREATE DATABASE " + databaseName, databaseName)); return influxDB;
InfluxDB as Source and Sink
Now we can stream messages from or to InfluxDB by providing the InfluxDB to the InfluxDbSource InfluxDbSource or the InfluxDbSink. InfluxDbSink.
- Scala
- 
  source @Measurement(name = "cpu", database = "InfluxDbSpec") public class InfluxDbSpecCpu extends Cpu { public InfluxDbSpecCpu() { } public InfluxDbSpecCpu(Instant time, String hostname, String region, Double idle, Boolean happydevop, Long uptimeSecs) { super(time, hostname, region, idle, happydevop, uptimeSecs); } public InfluxDbSpecCpu cloneAt(Instant time) { return new InfluxDbSpecCpu(time, getHostname(), getRegion(), getIdle(), getHappydevop(), getUptimeSecs()); } }
- Java
- 
  source @Measurement(name = "cpu", database = "InfluxDbTest") public class InfluxDbCpu extends Cpu { public InfluxDbCpu() {} public InfluxDbCpu( Instant time, String hostname, String region, Double idle, Boolean happydevop, Long uptimeSecs) { super(time, hostname, region, idle, happydevop, uptimeSecs); } public InfluxDbCpu cloneAt(Instant time) { return new InfluxDbCpu( time, getHostname(), getRegion(), getIdle(), getHappydevop(), getUptimeSecs()); } }
With typed source
Use InfluxDbSource.typed and InfluxDbSink.typed to create source and sink. The data is converted by InfluxDBMapper. The data is converted by InfluxDBMapper.
- Scala
- 
  source val f1 = InfluxDbSource .typed(classOf[InfluxDbSpecCpu], InfluxDbReadSettings(), influxDB, query) .map { (cpu: InfluxDbSpecCpu) => { val clonedCpu = cpu.cloneAt(cpu.getTime.plusSeconds(60000)) List(InfluxDbWriteMessage(clonedCpu)) } } .runWith(InfluxDbSink.typed(classOf[InfluxDbSpecCpu]))
- Java
- 
  source CompletionStage<Done> completionStage = InfluxDbSource.typed(InfluxDbCpu.class, InfluxDbReadSettings.Default(), influxDB, query) .map( cpu -> { InfluxDbCpu clonedCpu = cpu.cloneAt(cpu.getTime().plusSeconds(60000l)); return InfluxDbWriteMessage.create(clonedCpu, NotUsed.notUsed()); }) .groupedWithin(10, Duration.of(50l, ChronoUnit.MILLIS)) .runWith(InfluxDbSink.typed(InfluxDbCpu.class, influxDB), system);
With QueryResult source
Use InfluxDbSource.create and InfluxDbSink.create to create source and sink.
- Scala
- 
  source val query = new Query("SELECT * FROM cpu", DatabaseName); val f1 = InfluxDbSource(influxDB, query) .map(resultToPoints) .runWith(InfluxDbSink.create())
- Java
- 
  source Query query = new Query("SELECT * FROM cpu", DATABASE_NAME); CompletionStage<Done> completionStage = InfluxDbSource.create(influxDB, query) .map(queryResult -> points(queryResult)) .mapConcat(i -> i) .groupedWithin(10, Duration.of(50l, ChronoUnit.MILLIS)) .runWith(InfluxDbSink.create(influxDB), system);
TODO
Writing to InfluxDB
You can also build flow stages. InfluxDbFlow. InfluxDbFlow. The API is similar to creating Sinks.
- Scala
- 
  source val result = Source( List( List(validMessage) ) ).via(InfluxDbFlow.create()) .runWith(Sink.seq) .futureValue
- Java
- 
  source CompletableFuture<List<List<InfluxDbWriteResult<Point, NotUsed>>>> completableFuture = Source.single(Collections.singletonList(influxDbWriteMessage)) .via(InfluxDbFlow.create(influxDB)) .runWith(Sink.seq(), system) .toCompletableFuture();
Passing data through InfluxDbFlow
When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to InfluxDB.
- Scala
- 
  source // We're going to pretend we got messages from kafka. // After we've written them to InfluxDB, we want // to commit the offset to Kafka case class KafkaOffset(offset: Int) case class KafkaMessage(cpu: InfluxDbFlowCpu, offset: KafkaOffset) val messagesFromKafka = List( KafkaMessage(new InfluxDbFlowCpu(Instant.now().minusSeconds(1000), "local_1", "eu-west-2", 1.4d, true, 123L), KafkaOffset(0)), KafkaMessage(new InfluxDbFlowCpu(Instant.now().minusSeconds(2000), "local_2", "eu-west-1", 2.5d, false, 125L), KafkaOffset(1)), KafkaMessage(new InfluxDbFlowCpu(Instant.now().minusSeconds(3000), "local_3", "eu-west-4", 3.1d, false, 251L), KafkaOffset(2)) ) var committedOffsets = List[KafkaOffset]() def commitToKafka(offset: KafkaOffset): Unit = committedOffsets = committedOffsets :+ offset val f1 = Source(messagesFromKafka) .map { (kafkaMessage: KafkaMessage) => val cpu = kafkaMessage.cpu println("hostname: " + cpu.getHostname) InfluxDbWriteMessage(cpu).withPassThrough(kafkaMessage.offset) } .groupedWithin(10, 50.millis) .via( InfluxDbFlow.typedWithPassThrough(classOf[InfluxDbFlowCpu]) ) .map { (messages: Seq[InfluxDbWriteResult[InfluxDbFlowCpu, KafkaOffset]]) => messages.foreach { message => commitToKafka(message.writeMessage.passThrough) } } .runWith(Sink.ignore)
- Java
- 
  source // We're going to pretend we got metrics from kafka. // After we've written them to InfluxDb, we want // to commit the offset to Kafka /** Just clean the previous data */ influxDB.query(new Query("DELETE FROM cpu")); List<Integer> committedOffsets = new ArrayList<>(); List<MessageFromKafka> messageFromKafka = Arrays.asList( new MessageFromKafka( new InfluxDbCpu( Instant.now().minusSeconds(1000), "local_1", "eu-west-2", 1.4d, true, 123L), new KafkaOffset(0)), new MessageFromKafka( new InfluxDbCpu( Instant.now().minusSeconds(2000), "local_2", "eu-west-1", 2.5d, false, 125L), new KafkaOffset(1)), new MessageFromKafka( new InfluxDbCpu( Instant.now().minusSeconds(3000), "local_3", "eu-west-4", 3.1d, false, 251L), new KafkaOffset(2))); Consumer<KafkaOffset> commitToKafka = kafkaOffset -> committedOffsets.add(kafkaOffset.getOffset()); Source.from(messageFromKafka) .map( kafkaMessage -> { return InfluxDbWriteMessage.create( kafkaMessage.influxDbCpu, kafkaMessage.kafkaOffset); }) .groupedWithin(10, Duration.ofMillis(10)) .via(InfluxDbFlow.typedWithPassThrough(InfluxDbCpu.class, influxDB)) .map( messages -> { messages.stream() .forEach( message -> { KafkaOffset kafkaOffset = message.writeMessage().passThrough(); commitToKafka.accept(kafkaOffset); }); return NotUsed.getInstance(); }) .runWith(Sink.seq(), system) .toCompletableFuture() .get(10, TimeUnit.SECONDS);