Elasticsearch

The Alpakka Elasticsearch connector provides Akka Streams integration for Elasticsearch.

For more information about Elasticsearch, please visit the Elasticsearch documentation.

Project Info: Alpakka Elasticsearch
Artifact
com.lightbend.akka
akka-stream-alpakka-elasticsearch
4.0.0+10-38ab089f-SNAPSHOT
JDK versions
Adopt OpenJDK 8
Adopt OpenJDK 11
Scala versions2.13.8, 2.12.16
JPMS module nameakka.stream.alpakka.elasticsearch
License
Readiness level
Since 0.12, 2017-09-19
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
val AkkaVersion = "2.6.19"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "4.0.0+10-38ab089f-SNAPSHOT",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.6.19</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-elasticsearch_${scala.binary.version}</artifactId>
    <version>4.0.0+10-38ab089f-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.6.19",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-elasticsearch_${versions.ScalaBinary}:4.0.0+10-38ab089f-SNAPSHOT"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.fasterxml.jackson.corejackson-core2.13.4
com.fasterxml.jackson.corejackson-databind2.13.4
com.typesafe.akkaakka-http-spray-json_2.1310.2.9
com.typesafe.akkaakka-http_2.1310.2.9
com.typesafe.akkaakka-stream_2.132.6.19
org.scala-langscala-library2.13.8
Dependency tree
com.fasterxml.jackson.core    jackson-core    2.13.4    The Apache Software License, Version 2.0
com.fasterxml.jackson.core    jackson-databind    2.13.4    The Apache Software License, Version 2.0
    com.fasterxml.jackson.core    jackson-annotations    2.13.4    The Apache Software License, Version 2.0
    com.fasterxml.jackson.core    jackson-core    2.13.4    The Apache Software License, Version 2.0
com.typesafe.akka    akka-http-spray-json_2.13    10.2.9    Apache-2.0
    com.typesafe.akka    akka-http_2.13    10.2.9    Apache-2.0
        com.typesafe.akka    akka-http-core_2.13    10.2.9    Apache-2.0
            com.typesafe.akka    akka-parsing_2.13    10.2.9    Apache-2.0
                org.scala-lang    scala-library    2.13.8    Apache-2.0
            org.scala-lang    scala-library    2.13.8    Apache-2.0
        org.scala-lang    scala-library    2.13.8    Apache-2.0
    io.spray    spray-json_2.13    1.3.6    Apache 2
        org.scala-lang    scala-library    2.13.8    Apache-2.0
    org.scala-lang    scala-library    2.13.8    Apache-2.0
com.typesafe.akka    akka-http_2.13    10.2.9    Apache-2.0
    com.typesafe.akka    akka-http-core_2.13    10.2.9    Apache-2.0
        com.typesafe.akka    akka-parsing_2.13    10.2.9    Apache-2.0
            org.scala-lang    scala-library    2.13.8    Apache-2.0
        org.scala-lang    scala-library    2.13.8    Apache-2.0
    org.scala-lang    scala-library    2.13.8    Apache-2.0
com.typesafe.akka    akka-stream_2.13    2.6.19    Apache-2.0
    com.typesafe.akka    akka-actor_2.13    2.6.19    Apache-2.0
        com.typesafe    config    1.4.2    Apache-2.0
        org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
            org.scala-lang    scala-library    2.13.8    Apache-2.0
        org.scala-lang    scala-library    2.13.8    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.6.19    Apache-2.0
    com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
        com.typesafe    config    1.4.2    Apache-2.0
        org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
            org.scala-lang    scala-library    2.13.8    Apache-2.0
        org.scala-lang    scala-library    2.13.8    Apache-2.0
    org.reactivestreams    reactive-streams    1.0.3    CC0
    org.scala-lang    scala-library    2.13.8    Apache-2.0
org.scala-lang    scala-library    2.13.8    Apache-2.0

Elasticsearch connection

The connection and credentials to authenticate with are configured with ElasticsearchConnectionSettings.

Scala
sourceval connectionSettings = ElasticsearchConnectionSettings("http://localhost:9200")
  .withCredentials("user", "password")
Java
sourceElasticsearchConnectionSettings connectionSettings =
    ElasticsearchConnectionSettings.create("http://localhost:9200")
        .withCredentials("user", "password");
Parameter Default Description
baseUrl Empty The base URL of Elasticsearch. Should not include a trailing slash.
username None The username to authenticate with
password None The password to authenticate with
headers None List of headers that should be sent with the http request.
connectionContext None The connectionContext that will be used with the http request. This can be used for TLS Auth instead of basic auth (username/password) by setting the SSLContext within the connectionContext.

Elasticsearch parameters

Any API method that allows reading from and writing to Elasticsearch takes an instance of ElasticsearchParamsElasticsearchParams.

ElasticsearchParams has be constructed based on the ElasticSearch API version that you’re targeting:

Scala
sourceval elasticsearchParamsV5 = ElasticsearchParams.V5("index", "_doc")
val elasticsearchParamsV7 = ElasticsearchParams.V7("index")
Java
sourceElasticsearchParams elasticsearchParamsV5 = ElasticsearchParams.V5("source", "_doc");
ElasticsearchParams elasticsearchParamsV7 = ElasticsearchParams.V7("source");

Elasticsearch as Source and Sink

You can stream messages from or to Elasticsearch using the ElasticsearchSourceElasticsearchSource, ElasticsearchFlowElasticsearchFlow or the ElasticsearchSinkElasticsearchSink.

Scala
sourceimport spray.json._
import DefaultJsonProtocol._

case class Book(title: String, shouldSkip: Option[Boolean] = None, price: Int = 10)

implicit val format: JsonFormat[Book] = jsonFormat3(Book)
Java
sourcepublic static class Book {
  public String title;

  public Book() {}

  public Book(String title) {
    this.title = title;
  }
}

With typed source

Use ElasticsearchSource.typed and ElasticsearchSink.create to create source and sink. The data is converted to and from JSON by Spray JSON. The data is converted to and from JSON by Jackson’s ObjectMapper.

Scala
sourceval copy = ElasticsearchSource
  .typed[Book](
    constructElasticsearchParams("source", "_doc", ApiVersion.V5),
    query = """{"match_all": {}}""",
    settings = baseSourceSettings
  )
  .map { message: ReadResult[Book] =>
    WriteMessage.createIndexMessage(message.id, message.source)
  }
  .runWith(
    ElasticsearchSink.create[Book](
      constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
      settings = baseWriteSettings
    )
  )
Java
sourceElasticsearchSourceSettings sourceSettings =
    ElasticsearchSourceSettings.create(connectionSettings).withApiVersion(ApiVersion.V5);
ElasticsearchWriteSettings sinkSettings =
    ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V5);

Source<ReadResult<ElasticsearchTestBase.Book>, NotUsed> source =
    ElasticsearchSource.typed(
        constructElasticsearchParams("source", "_doc", ApiVersion.V5),
        "{\"match_all\": {}}",
        sourceSettings,
        ElasticsearchTestBase.Book.class);
CompletionStage<Done> f1 =
    source
        .map(m -> WriteMessage.createIndexMessage(m.id(), m.source()))
        .runWith(
            ElasticsearchSink.create(
                constructElasticsearchParams("sink2", "_doc", ApiVersion.V5),
                sinkSettings,
                new ObjectMapper()),
            system);

With JSON source

Use ElasticsearchSource.create and ElasticsearchSink.create to create source and sink.

Scala
sourceval copy = ElasticsearchSource
  .create(
    constructElasticsearchParams("source", "_doc", ApiVersion.V5),
    query = """{"match_all": {}}""",
    settings = baseSourceSettings
  )
  .map { message: ReadResult[spray.json.JsObject] =>
    val book: Book = jsonReader[Book].read(message.source)
    WriteMessage.createIndexMessage(message.id, book)
  }
  .runWith(
    ElasticsearchSink.create[Book](
      constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
      settings = baseWriteSettings
    )
  )
Java
sourceElasticsearchSourceSettings sourceSettings =
    ElasticsearchSourceSettings.create(connectionSettings).withApiVersion(ApiVersion.V5);
ElasticsearchWriteSettings sinkSettings =
    ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V5);

Source<ReadResult<Map<String, Object>>, NotUsed> source =
    ElasticsearchSource.create(
        constructElasticsearchParams("source", "_doc", ApiVersion.V5),
        "{\"match_all\": {}}",
        sourceSettings);
CompletionStage<Done> f1 =
    source
        .map(m -> WriteMessage.createIndexMessage(m.id(), m.source()))
        .runWith(
            ElasticsearchSink.create(
                constructElasticsearchParams("sink1", "_doc", ApiVersion.V5),
                sinkSettings,
                new ObjectMapper()),
            system);

Writing to Elasticsearch

In the above examples, WriteMessage is used as the input to ElasticsearchSink and ElasticsearchFlow. This means requesting index operation to Elasticsearch. It’s possible to request other operations using following message types:

Message factory Description
WriteMessage.createIndexMessage Create a new document. If id is specified and it already exists, replace the document and increment its version.
WriteMessage.createCreateMessage Create a new document. If id already exists, the WriteResult will contain an error.
WriteMessage.createUpdateMessage Update an existing document. If there is no document with the specified id, do nothing.
WriteMessage.createUpsertMessage Update an existing document. If there is no document with the specified id, create a new document.
WriteMessage.createDeleteMessage Delete an existing document. If there is no document with the specified id, do nothing.
Scala
sourceval requests = List[WriteMessage[Book, NotUsed]](
  WriteMessage.createIndexMessage(id = "00001", source = Book("Book 1")),
  WriteMessage.createUpsertMessage(id = "00002", source = Book("Book 2")),
  WriteMessage.createUpsertMessage(id = "00003", source = Book("Book 3")),
  WriteMessage.createUpdateMessage(id = "00004", source = Book("Book 4")),
  WriteMessage.createCreateMessage(id = "00005", source = Book("Book 5")),
  WriteMessage.createDeleteMessage(id = "00002")
)

val writeResults = Source(requests)
  .via(
    ElasticsearchFlow.create[Book](
      constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
      baseWriteSettings
    )
  )
  .runWith(Sink.seq)
Java
source// Create, update, upsert and delete documents in sink8/book
List<WriteMessage<Book, NotUsed>> requests =
    Arrays.asList(
        WriteMessage.createIndexMessage("00001", new Book("Book 1")),
        WriteMessage.createUpsertMessage("00002", new Book("Book 2")),
        WriteMessage.createUpsertMessage("00003", new Book("Book 3")),
        WriteMessage.createUpdateMessage("00004", new Book("Book 4")),
        WriteMessage.createDeleteMessage("00002"));

Source.from(requests)
    .via(
        ElasticsearchFlow.create(
            constructElasticsearchParams("sink8", "_doc", ApiVersion.V5),
            ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V5),
            new ObjectMapper()))
    .runWith(Sink.seq(), system)
    .toCompletableFuture()
    .get();

Source configuration

We can configure the source by ElasticsearchSourceSettings.

Scala
sourceval sourceSettings = ElasticsearchSourceSettings(connectionSettings)
  .withBufferSize(10)
  .withScrollDuration(5.minutes)
Java
sourceElasticsearchSourceSettings sourceSettings =
    ElasticsearchSourceSettings.create(connectionSettings).withBufferSize(10);
Parameter Default Description
connection The connection details and credentials to authenticate against ElasticSearch. See ElasticsearchConnectionSettings
bufferSize 10 ElasticsearchSource retrieves messages from Elasticsearch by scroll scan. This buffer size is used as the scroll size.
includeDocumentVersion false Tell Elasticsearch to return the documents _version property with the search results. See Version and Optimistic Concurrenct Control to know about this property.
scrollDuration 5 min ElasticsearchSource retrieves messages from Elasticsearch by scroll scan. This parameter is used as a scroll value. See Time units for supported units.
apiVersion V7 Currently supports V5 and V7 (see below)

Sink and flow configuration

Sinks and flows are configured with ElasticsearchWriteSettings.

Scala
sourceval sinkSettings =
  ElasticsearchWriteSettings(connectionSettings)
    .withBufferSize(10)
    .withVersionType("internal")
    .withRetryLogic(RetryAtFixedRate(maxRetries = 5, retryInterval = 1.second))
    .withApiVersion(ApiVersion.V5)
Java
sourceElasticsearchWriteSettings settings =
    ElasticsearchWriteSettings.create(connectionSettings)
        .withBufferSize(10)
        .withVersionType("internal")
        .withRetryLogic(RetryAtFixedRate.create(5, Duration.ofSeconds(1)))
        .withApiVersion(ApiVersion.V5);
Parameter Default Description
connection The connection details and credentials to authenticate against ElasticSearch. See ElasticsearchConnectionSettings
bufferSize 10 Flow and Sink batch messages to bulk requests when back-pressure applies.
versionType None If set, ElasticsearchSink uses the chosen versionType to index documents. See Version types for accepted settings.
retryLogic No retries See below
apiVersion V7 Currently supports V5 and V7 (see below)
allowExplicitIndex True When set to False, the index name will be included in the URL instead of on each document (see below)

Retry logic

A bulk request might fail partially for some reason. To retry failed writes to Elasticsearch, a RetryLogic can be specified.

The provided implementations are:

  • RetryAtFixedRate
Parameter Description
maxRetries The stage fails, if it gets this number of consecutive failures.
retryInterval Failing writes are retried after this duration.
  • RetryWithBackoff
Parameter Description
maxRetries The stage fails, if it gets this number of consecutive failures.
minBackoff Initial backoff for failing writes.
maxBackoff Maximum backoff for failing writes.

In case of write failures the order of messages downstream is guaranteed to be preserved.

Supported API versions

To support reading and writing to multiple versions of Elasticsearch, an ApiVersion can be specified.

This will be used to: 1. transform the bulk request into a format understood by the corresponding Elasticsearch server. 2. determine whether to include the index type mapping in the API calls. See removal of types

Currently V5 and V7 are supported specifically but this parameter does not need to match the server version exactly (for example, either V5 or V7 should work with Elasticsearch 6.x).

Allow explicit index

When using the _bulk API, Elasticsearch will reject requests that have an explicit index in the request body if explicit index names are not allowed. See URL-based access control

Elasticsearch as Flow

You can also build flow stages with ElasticsearchFlowElasticsearchFlow. The API is similar to creating Sinks.

Scala
sourceval copy = ElasticsearchSource
  .typed[Book](
    constructElasticsearchParams("source", "_doc", ApiVersion.V5),
    query = """{"match_all": {}}""",
    settings = baseSourceSettings
  )
  .map { message: ReadResult[Book] =>
    WriteMessage.createIndexMessage(message.id, message.source)
  }
  .via(
    ElasticsearchFlow.create[Book](
      constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
      settings = baseWriteSettings
    )
  )
  .runWith(Sink.seq)
Java
sourceCompletionStage<List<WriteResult<Book, NotUsed>>> f1 =
    ElasticsearchSource.typed(
            constructElasticsearchParams("source", "_doc", ApiVersion.V5),
            "{\"match_all\": {}}",
            ElasticsearchSourceSettings.create(connectionSettings)
                .withApiVersion(ApiVersion.V5)
                .withBufferSize(5),
            Book.class)
        .map(m -> WriteMessage.createIndexMessage(m.id(), m.source()))
        .via(
            ElasticsearchFlow.create(
                constructElasticsearchParams("sink3", "_doc", ApiVersion.V5),
                ElasticsearchWriteSettings.create(connectionSettings)
                    .withApiVersion(ApiVersion.V5)
                    .withBufferSize(5),
                new ObjectMapper()))
        .runWith(Sink.seq(), system);

Storing documents from Strings

Elasticsearch requires the documents to be properly formatted JSON. If your data is available as JSON in Strings, you may use the pre-defined StringMessageWriter to avoid any conversions. For any other JSON technologies, implement a MessageWriter[T]MessageWriter<T>.

Scala
sourceval write: Future[immutable.Seq[WriteResult[String, NotUsed]]] = Source(
  immutable.Seq(
    WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.toString()),
    WriteMessage.createIndexMessage("2", Book("Faust").toJson.toString()),
    WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.toString())
  )
).via(
    ElasticsearchFlow.create(
      constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
      settings = baseWriteSettings,
      StringMessageWriter
    )
  )
  .runWith(Sink.seq)
Java
sourceString indexName = "sink3-0";
CompletionStage<List<WriteResult<String, NotUsed>>> write =
    Source.from(
            Arrays.asList(
                WriteMessage.createIndexMessage("1", "{\"title\": \"Das Parfum\"}"),
                WriteMessage.createIndexMessage("2", "{\"title\": \"Faust\"}"),
                WriteMessage.createIndexMessage(
                    "3", "{\"title\": \"Die unendliche Geschichte\"}")))
        .via(
            ElasticsearchFlow.create(
                constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
                ElasticsearchWriteSettings.create(connectionSettings)
                    .withApiVersion(ApiVersion.V5)
                    .withBufferSize(5),
                StringMessageWriter.getInstance()))
        .runWith(Sink.seq(), system);

Passing data through ElasticsearchFlow

When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to Elastic.

Scala
source// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)

val messagesFromKafka = List(
  KafkaMessage(Book("Book 1"), KafkaOffset(0)),
  KafkaMessage(Book("Book 2"), KafkaOffset(1)),
  KafkaMessage(Book("Book 3"), KafkaOffset(2))
)

var committedOffsets = Vector[KafkaOffset]()

def commitToKafka(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val indexName = "sink6"
val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
  .map { kafkaMessage: KafkaMessage =>
    val book = kafkaMessage.book
    val id = book.title

    // Transform message so that we can write to elastic
    WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset)
  }
  .via( // write to elastic
    ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset](
      constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
      settings = baseWriteSettings
    )
  )
  .map { result =>
    if (!result.success) throw new Exception("Failed to write message to elastic")
    // Commit to kafka
    commitToKafka(result.message.passThrough)
  }
  .runWith(Sink.ignore)

kafkaToEs.futureValue shouldBe Done
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)

val messagesFromKafka = List(
  KafkaMessage(Book("Book 1"), KafkaOffset(0)),
  KafkaMessage(Book("Book 2"), KafkaOffset(1)),
  KafkaMessage(Book("Book 3"), KafkaOffset(2))
)

var committedOffsets = Vector[KafkaOffset]()

def commitToKafka(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val indexName = "sink6-bulk"
val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
  .map { kafkaMessage: KafkaMessage =>
    val book = kafkaMessage.book
    val id = book.title

    // Transform message so that we can write to elastic
    WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset)
  }
  .grouped(2)
  .via( // write to elastic
    ElasticsearchFlow.createBulk[Book, KafkaOffset](
      constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
      settings = baseWriteSettings
    )
  )
  .map(_.map { result =>
    if (!result.success) throw new Exception("Failed to write message to elastic")
    // Commit to kafka
    commitToKafka(result.message.passThrough)
  })
  .runWith(Sink.ignore)

kafkaToEs.futureValue shouldBe Done
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)

val messagesFromKafka = List(
  KafkaMessage(Book("Book A", shouldSkip = Some(true)), KafkaOffset(0)),
  KafkaMessage(Book("Book 1"), KafkaOffset(1)),
  KafkaMessage(Book("Book 2"), KafkaOffset(2)),
  KafkaMessage(Book("Book B", shouldSkip = Some(true)), KafkaOffset(3)),
  KafkaMessage(Book("Book 3"), KafkaOffset(4)),
  KafkaMessage(Book("Book C", shouldSkip = Some(true)), KafkaOffset(5))
)

var committedOffsets = Vector[KafkaOffset]()

def commitToKafka(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val indexName = "sink6-nop"
val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
  .map { kafkaMessage: KafkaMessage =>
    val book = kafkaMessage.book
    val id = book.title

    // Transform message so that we can write to elastic
    if (book.shouldSkip.getOrElse(false))
      WriteMessage.createNopMessage[Book]().withPassThrough(kafkaMessage.offset)
    else
      WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset)
  }
  .via( // write to elastic
    ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset](
      constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
      settings = baseWriteSettings
    )
  )
  .map { result =>
    if (!result.success) throw new Exception("Failed to write message to elastic")
    // Commit to kafka
    commitToKafka(result.message.passThrough)
  }
  .runWith(Sink.ignore)

kafkaToEs.futureValue shouldBe Done
// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)

val messagesFromKafka = List(
  KafkaMessage(Book("Book 1", shouldSkip = Some(true)), KafkaOffset(0)),
  KafkaMessage(Book("Book 2", shouldSkip = Some(true)), KafkaOffset(1)),
  KafkaMessage(Book("Book 3", shouldSkip = Some(true)), KafkaOffset(2))
)

var committedOffsets = Vector[KafkaOffset]()

def commitToKafka(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val indexName = "sink6-none"
register(connectionSettings, indexName, "dummy", 10) // need to create index else exception in reading below

val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
  .map { kafkaMessage: KafkaMessage =>
    val book = kafkaMessage.book
    val id = book.title

    // Transform message so that we can write to elastic
    if (book.shouldSkip.getOrElse(false))
      WriteMessage.createNopMessage[Book]().withPassThrough(kafkaMessage.offset)
    else
      WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset)
  }
  .via( // write to elastic
    ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset](
      constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
      settings = baseWriteSettings
    )
  )
  .map { result =>
    if (!result.success) throw new Exception("Failed to write message to elastic")
    // Commit to kafka
    commitToKafka(result.message.passThrough)
  }
  .runWith(Sink.ignore)

kafkaToEs.futureValue shouldBe Done
Java
source// We're going to pretend we got messages from kafka.
// After we've written them to Elastic, we want
// to commit the offset to Kafka

List<KafkaMessage> messagesFromKafka =
    Arrays.asList(
        new KafkaMessage(new Book("Book 1"), new KafkaOffset(0)),
        new KafkaMessage(new Book("Book 2"), new KafkaOffset(1)),
        new KafkaMessage(new Book("Book 3"), new KafkaOffset(2)));

final KafkaCommitter kafkaCommitter = new KafkaCommitter();

CompletionStage<Done> kafkaToEs =
    Source.from(messagesFromKafka) // Assume we get this from Kafka
        .map(
            kafkaMessage -> {
              Book book = kafkaMessage.book;
              String id = book.title;

              // Transform message so that we can write to elastic
              return WriteMessage.createIndexMessage(id, book)
                  .withPassThrough(kafkaMessage.offset);
            })
        .via( // write to elastic
            ElasticsearchFlow.createWithPassThrough(
                constructElasticsearchParams("sink6", "_doc", ApiVersion.V5),
                ElasticsearchWriteSettings.create(connectionSettings)
                    .withApiVersion(ApiVersion.V5)
                    .withBufferSize(5),
                new ObjectMapper()))
        .map(
            result -> {
              if (!result.success())
                throw new RuntimeException("Failed to write message to elastic");
              // Commit to kafka
              kafkaCommitter.commit(result.message().passThrough());
              return NotUsed.getInstance();
            })
        .runWith(Sink.ignore(), system);

Specifying custom index-name for every document

When working with index-patterns using wildcards, you might need to specify a custom index-name for each document:

Scala
sourceval customIndexName = "custom-index"

val writeCustomIndex = ElasticsearchSource
  .typed[Book](
    constructElasticsearchParams("source", "_doc", ApiVersion.V5),
    query = """{"match_all": {}}""",
    settings = baseSourceSettings
  )
  .map { message: ReadResult[Book] =>
    WriteMessage
      .createIndexMessage(message.id, message.source)
      .withIndexName(customIndexName) // Setting the index-name to use for this document
  }
  .runWith(
    ElasticsearchSink.create[Book](
      constructElasticsearchParams("this-is-not-the-index-we-are-using", "_doc", ApiVersion.V5),
      settings = baseWriteSettings
    )
  )
Java
sourceWriteMessage<String, NotUsed> msg =
    WriteMessage.createIndexMessage(doc).withIndexName("my-index");

Specifying custom metadata for every document

In some cases you might want to specify custom metadata per document you are inserting, for example a pipeline, this can be done like so:

Scala
sourceval msg = WriteMessage
  .createIndexMessage(doc)
  .withCustomMetadata(Map("pipeline" -> "myPipeline"))
Java
sourceMap<String, String> metadata = new HashMap<>();
metadata.put("pipeline", "myPipeline");
WriteMessage<String, NotUsed> msgWithMetadata =
    WriteMessage.createIndexMessage(doc).withCustomMetadata(metadata);

More custom searching

The easiest way of using Elasticsearch-source, is to just specify the query-param. Sometimes you need more control, like specifying which fields to return and so on. In such cases you can instead use ‘searchParams’ instead:

Scala
sourcecase class TestDoc(id: String, a: String, b: Option[String], c: String)
// Search for docs and ask elastic to only return some fields

val readWithSearchParameters = ElasticsearchSource
  .typed[TestDoc](
    constructElasticsearchParams(indexName, typeName, ApiVersion.V5),
    searchParams = Map(
      "query" -> """ {"match_all": {}} """,
      "_source" -> """ ["id", "a", "c"] """
    ),
    baseSourceSettings
  )
  .map { message =>
    message.source
  }
  .runWith(Sink.seq)
Java
source// Search for docs and ask elastic to only return some fields

Map<String, String> searchParams = new HashMap<>();
searchParams.put("query", "{\"match_all\": {}}");
searchParams.put("_source", "[\"id\", \"a\", \"c\"]");

List<TestDoc> result =
    ElasticsearchSource.<TestDoc>typed(
            constructElasticsearchParams(indexName, typeName, ApiVersion.V5),
            searchParams, // <-- Using searchParams
            ElasticsearchSourceSettings.create(connectionSettings)
                .withApiVersion(ApiVersion.V5),
            TestDoc.class,
            new ObjectMapper())
        .map(
            o -> {
              return o.source(); // These documents will only have property id, a and c (not
            })
        .runWith(Sink.seq(), system)
        .toCompletableFuture()
        .get();

Routing

Support for custom routing is available through the routing key. Add this key and the respective value in ‘searchParams’ map, to route your search directly to the shard that holds the document you are looking for and enjoy improved response times.

Sort

Support for sort is available through the sort key in searchParams map. If no sort is given, the source will use sort=_doc to maximize performance, as indicated by elasticsearch documentation.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.