Elasticsearch

The Alpakka Elasticsearch connector provides Akka Streams integration for Elasticsearch.

For more information about Elasticsearch, please visit the Elasticsearch documentation.

Project Info: Alpakka Elasticsearch
Artifact
com.lightbend.akka
akka-stream-alpakka-elasticsearch
1.0-M2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12
JPMS module nameakka.stream.alpakka.elasticsearch
License
Readiness level
Community-driven
Since 0.12, 2017-09-19
Home pagehttps://doc.akka.io/docs/alpakka/current/
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "1.0-M2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-elasticsearch_2.12</artifactId>
  <version>1.0-M2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-elasticsearch_2.12', version: '1.0-M2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.fasterxml.jackson.corejackson-databind2.9.8The Apache Software License, Version 2.0
com.typesafe.akkaakka-stream_2.122.5.19Apache License, Version 2.0
io.sprayspray-json_2.121.3.5Apache 2
org.elasticsearch.clientelasticsearch-rest-client6.3.1The Apache Software License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.fasterxml.jackson.core    jackson-databind    2.9.8    The Apache Software License, Version 2.0
    com.fasterxml.jackson.core    jackson-annotations    2.9.0    The Apache Software License, Version 2.0
    com.fasterxml.jackson.core    jackson-core    2.9.8    The Apache Software License, Version 2.0
com.typesafe.akka    akka-stream_2.12    2.5.19    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.19    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.19    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.6    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
io.spray    spray-json_2.12    1.3.5    Apache 2
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.elasticsearch.client    elasticsearch-rest-client    6.3.1    The Apache Software License, Version 2.0
    commons-codec    commons-codec    1.10    Apache License, Version 2.0
    commons-logging    commons-logging    1.1.3    The Apache Software License, Version 2.0
    org.apache.httpcomponents    httpasyncclient    4.1.2    Apache License, Version 2.0
    org.apache.httpcomponents    httpclient    4.5.2    Apache License, Version 2.0
    org.apache.httpcomponents    httpcore-nio    4.4.5    Apache License, Version 2.0
    org.apache.httpcomponents    httpcore    4.4.5    Apache License, Version 2.0
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Set up REST client

Sources, Flows and Sinks provided by this connector need a prepared org.elasticsearch.client.RestClient to access to Elasticsearch.

Scala
import org.apache.http.HttpHost
import org.elasticsearch.client.RestClient

implicit val client: RestClient = RestClient.builder(new HttpHost("localhost", 9201)).build()
Java
import akka.stream.alpakka.elasticsearch.*;
import akka.stream.alpakka.elasticsearch.javadsl.*;

import org.elasticsearch.client.RestClient;
import org.apache.http.HttpHost;

client = RestClient.builder(new HttpHost("localhost", 9201)).build();

Elasticsearch as Source and Sink

Now we can stream messages from or to Elasticsearch by providing the RestClient to the ElasticsearchSource ElasticsearchSource or the ElasticsearchSink. ElasticsearchSink.

Scala
import spray.json._
import DefaultJsonProtocol._

case class Book(title: String)

implicit val format: JsonFormat[Book] = jsonFormat1(Book)
Java
public static class Book {
  public String title;

  public Book() {}

  public Book(String title) {
    this.title = title;
  }
}

With typed source

Use ElasticsearchSource.typed and ElasticsearchSink.create to create source and sink. The data is converted to and from JSON by Spray JSON. The data is converted to and from JSON by Jackson’s ObjectMapper.

Scala
val f1 = ElasticsearchSource
  .typed[Book](
    indexName = "source",
    typeName = "_doc",
    query = """{"match_all": {}}"""
  )
  .map { message: ReadResult[Book] =>
    WriteMessage.createIndexMessage(message.id, message.source)
  }
  .runWith(
    ElasticsearchSink.create[Book](
      indexName = "sink2",
      typeName = "_doc"
    )
  )
Java
ElasticsearchSourceSettings sourceSettings = ElasticsearchSourceSettings.create();
ElasticsearchWriteSettings sinkSettings = ElasticsearchWriteSettings.create();

Source<ReadResult<Book>, NotUsed> source =
    ElasticsearchSource.typed(
        "source", "_doc", "{\"match_all\": {}}", sourceSettings, client, Book.class);
CompletionStage<Done> f1 =
    source
        .map(m -> WriteMessage.createIndexMessage(m.id(), m.source()))
        .runWith(
            ElasticsearchSink.create("sink2", "_doc", sinkSettings, client, new ObjectMapper()),
            materializer);

With JSON source

Use ElasticsearchSource.create and ElasticsearchSink.create to create source and sink.

Scala
val f1 = ElasticsearchSource
  .create(
    indexName = "source",
    typeName = "_doc",
    query = """{"match_all": {}}"""
  )
  .map { message: ReadResult[spray.json.JsObject] =>
    val book: Book = jsonReader[Book].read(message.source)
    WriteMessage.createIndexMessage(message.id, book)
  }
  .runWith(
    ElasticsearchSink.create[Book](
      indexName = "sink2",
      typeName = "_doc"
    )
  )
Java
ElasticsearchSourceSettings sourceSettings = ElasticsearchSourceSettings.create();
ElasticsearchWriteSettings sinkSettings = ElasticsearchWriteSettings.create();

Source<ReadResult<Map<String, Object>>, NotUsed> source =
    ElasticsearchSource.create("source", "_doc", "{\"match_all\": {}}", sourceSettings, client);
CompletionStage<Done> f1 =
    source
        .map(m -> WriteMessage.createIndexMessage(m.id(), m.source()))
        .runWith(
            ElasticsearchSink.create("sink1", "_doc", sinkSettings, client, new ObjectMapper()),
            materializer);

Writing to Elasticsearch

In the above examples, WriteMessage is used as the input to ElasticsearchSink and ElasticsearchFlow. This means requesting index operation to Elasticsearch. It’s possible to request other operations using following message types:

Message factory Description
WriteMessage.createIndexMessage Create a new document. If id is specified and it already exists, do nothing.
WriteMessage.createUpdateMessage Update an existing document. If there is no document with the specified id, do nothing.
WriteMessage.createUpsertMessage Update an existing document. If there is no document with the specified id, create a new document.
WriteMessage.createDeleteMessage Delete an existing document. If there is no document with the specified id, do nothing.
Scala
// Create, update, upsert and delete documents in sink8/_doc
val requests = List[WriteMessage[Book, NotUsed]](
  WriteMessage.createIndexMessage(id = "00001", source = Book("Book 1")),
  WriteMessage.createUpsertMessage(id = "00002", source = Book("Book 2")),
  WriteMessage.createUpsertMessage(id = "00003", source = Book("Book 3")),
  WriteMessage.createUpdateMessage(id = "00004", source = Book("Book 4")),
  WriteMessage.createDeleteMessage(id = "00002")
)

val f1 = Source(requests)
  .via(
    ElasticsearchFlow.create[Book](
      "sink8",
      "_doc"
    )
  )
  .runWith(Sink.seq)
Java
// Create, update, upsert and delete documents in sink8/book
List<WriteMessage<Book, NotUsed>> requests =
    Arrays.asList(
        WriteMessage.createIndexMessage("00001", new Book("Book 1")),
        WriteMessage.createUpsertMessage("00002", new Book("Book 2")),
        WriteMessage.createUpsertMessage("00003", new Book("Book 3")),
        WriteMessage.createUpdateMessage("00004", new Book("Book 4")),
        WriteMessage.createDeleteMessage("00002"));

Source.from(requests)
    .via(
        ElasticsearchFlow.create(
            "sink8", "_doc", ElasticsearchWriteSettings.create(), client, new ObjectMapper()))
    .runWith(Sink.seq(), materializer)
    .toCompletableFuture()
    .get();

Source configuration

We can configure the source by ElasticsearchSourceSettings.

Scala
val sourceSettings = ElasticsearchSourceSettings()
  .withBufferSize(10)
  .withScrollDuration(FiniteDuration(5, TimeUnit.MINUTES))
Java
ElasticsearchSourceSettings sourceSettings =
    ElasticsearchSourceSettings.create().withBufferSize(10);
Parameter Default Description
bufferSize 10 ElasticsearchSource retrieves messages from Elasticsearch by scroll scan. This buffer size is used as the scroll size.
includeDocumentVersion false Tell Elasticsearch to return the documents _version property with the search results. See Version and Optimistic Concurrenct Control to know about this property.
scrollDuration 5 min ElasticsearchSource retrieves messages from Elasticsearch by scroll scan. This parameter is used as a scroll value. See Time units for supported units.

Sink and flow configuration

Sinks and flows are configured with ElasticsearchWriteSettings.

Scala
val sinkSettings =
  ElasticsearchWriteSettings()
    .withBufferSize(10)
    .withVersionType("internal")
    .withRetryLogic(RetryAtFixedRate(maxRetries = 5, retryInterval = 1.second))
Java
ElasticsearchWriteSettings settings =
    ElasticsearchWriteSettings.create()
        .withBufferSize(10)
        .withVersionType("internal")
        .withRetryLogic(RetryAtFixedRate.create(5, Duration.ofSeconds(1)));
Parameter Default Description
bufferSize 10 ElasticsearchSink puts messages by one bulk request per messages of this buffer size.
versionType None If set, ElasticsearchSink uses the chosen versionType to index documents. See Version types for accepted settings.
retryLogic No retries See below

A bulk request might fail partially for some reason. To retry failed writes to Elasticsearch, a RetryLogic can be specified. The provided implementation is RetryAtFixedRate.

Warning

If using retries, you will receive messages out of order downstream in cases where elastic returns an error one some of the documents in a bulk request.

Parameter Description
maxRetries The stage fails, if it gets this number of consecutive failures.
retryInterval Failing writes are retried after this duration.

Elasticsearch as Flow

You can also build flow stages with ElasticsearchFlow. ElasticsearchFlow. The API is similar to creating Sinks.

Scala
val f1 = ElasticsearchSource
  .typed[Book](
    indexName = "source",
    typeName = "_doc",
    query = """{"match_all": {}}"""
  )
  .map { message: ReadResult[Book] =>
    WriteMessage.createIndexMessage(message.id, message.source)
  }
  .via(
    ElasticsearchFlow.create[Book](
      indexName = "sink3",
      typeName = "_doc"
    )
  )
  .runWith(Sink.seq)
Java
CompletionStage<List<List<WriteResult<Book, NotUsed>>>> f1 =
    ElasticsearchSource.typed(
            "source",
            "_doc",
            "{\"match_all\": {}}",
            ElasticsearchSourceSettings.create().withBufferSize(5),
            client,
            Book.class)
        .map(m -> WriteMessage.createIndexMessage(m.id(), m.source()))
        .via(
            ElasticsearchFlow.create(
                "sink3",
                "_doc",
                ElasticsearchWriteSettings.create().withBufferSize(5),
                client,
                new ObjectMapper()))
        .runWith(Sink.seq(), materializer);

Passing data through ElasticsearchFlow

When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to Elastic.

Scala
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)

val messagesFromKafka = List(
  KafkaMessage(Book("Book 1"), KafkaOffset(0)),
  KafkaMessage(Book("Book 2"), KafkaOffset(1)),
  KafkaMessage(Book("Book 3"), KafkaOffset(2))
)

var committedOffsets = List[KafkaOffset]()

def commitToKafka(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val f1 = Source(messagesFromKafka) // Assume we get this from Kafka
  .map { kafkaMessage: KafkaMessage =>
    val book = kafkaMessage.book
    val id = book.title
    println("title: " + book.title)

    // Transform message so that we can write to elastic
    WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset)
  }
  .via( // write to elastic
    ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset](
      indexName = "sink6",
      typeName = "_doc"
    )
  )
  .map { messageResults =>
    messageResults.foreach { result =>
      if (!result.success) throw new Exception("Failed to write message to elastic")
      // Commit to kafka
      commitToKafka(result.message.passThrough)
    }
  }
  .runWith(Sink.seq)

Await.ready(f1, Duration.Inf)
Java
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka

List<KafkaMessage> messagesFromKafka =
    Arrays.asList(
        new KafkaMessage(new Book("Book 1"), new KafkaOffset(0)),
        new KafkaMessage(new Book("Book 2"), new KafkaOffset(1)),
        new KafkaMessage(new Book("Book 3"), new KafkaOffset(2)));

final KafkaCommitter kafkaCommitter = new KafkaCommitter();

Source.from(messagesFromKafka) // Assume we get this from Kafka
    .map(
        kafkaMessage -> {
          Book book = kafkaMessage.book;
          String id = book.title;

          // Transform message so that we can write to elastic
          return WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset);
        })
    .via( // write to elastic
        ElasticsearchFlow.createWithPassThrough(
            "sink6",
            "_doc",
            ElasticsearchWriteSettings.create().withBufferSize(5),
            client,
            new ObjectMapper()))
    .map(
        messageResults -> {
          messageResults
              .stream()
              .forEach(
                  result -> {
                    if (!result.success())
                      throw new RuntimeException("Failed to write message to elastic");
                    // Commit to kafka
                    kafkaCommitter.commit(result.message().passThrough());
                  });
          return NotUsed.getInstance();
        })
    .runWith(Sink.seq(), materializer) // Run it
    .toCompletableFuture()
    .get(); // Wait for it to complete

Specifying custom index-name for every document

When working with index-patterns using wildcards, you might need to specify a custom index-name for each document:

Scala
val customIndexName = "custom-index"

val f1 = ElasticsearchSource
  .typed[Book](
    indexName = "source",
    typeName = "_doc",
    query = """{"match_all": {}}"""
  )
  .map { message: ReadResult[Book] =>
    WriteMessage
      .createIndexMessage(message.id, message.source)
      .withIndexName(customIndexName) // Setting the index-name to use for this document
  }
  .runWith(
    ElasticsearchSink.create[Book](
      indexName = "this-is-not-the-index-we-are-using",
      typeName = "_doc"
    )
  )
Java
WriteMessage msg = WriteMessage.createIndexMessage(doc).withIndexName("my-index");

Specifying custom metadata for every document

In some cases you might want to specify custom metadata per document you are inserting, for example a pipeline, this can be done like so:

Scala
val msg = WriteMessage
  .createIndexMessage(doc)
  .withCustomMetadata(Map("pipeline" -> "myPipeline"))
Java
Map<String, String> metadata = new HashMap<>();
metadata.put("pipeline", "myPipeline");
WriteMessage msgWithMetadata =
    WriteMessage.createIndexMessage(doc).withCustomMetadata(metadata);

More custom searching

The easiest way of using ElasticSearch-source, is to just specify the query-param. Sometimes you need more control, like specifying which fields to return and so on. In such cases you can instead use ‘searchParams’ instead:

Scala
case class TestDoc(id: String, a: String, b: Option[String], c: String)
// Search for docs and ask elastic to only return some fields

val f3 = ElasticsearchSource
  .typed[TestDoc](indexName,
                  Some(typeName),
                  searchParams = Map(
                    "query" -> """ {"match_all": {}} """,
                    "_source" -> """ ["id", "a", "c"] """
                  ),
                  ElasticsearchSourceSettings.Default)
  .map { message =>
    message.source
  }
  .runWith(Sink.seq)
Java
public static class TestDoc {
  public String id;
  public String a;
  public String b;
  public String c;

}
  // Search for docs and ask elastic to only return some fields

  Map<String, String> searchParams = new HashMap<>();
  searchParams.put("query", "{\"match_all\": {}}");
  searchParams.put("_source", "[\"id\", \"a\", \"c\"]");

  List<TestDoc> result =
      ElasticsearchSource.<TestDoc>typed(
              indexName,
              typeName,
              searchParams, // <-- Using searchParams
              ElasticsearchSourceSettings.create(),
              client,
              TestDoc.class,
              new ObjectMapper())
          .map(
              o -> {
                return o.source(); // These documents will only have property id, a and c (not b)
              })
          .runWith(Sink.seq(), materializer)
          .toCompletableFuture()
          .get();
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.