Elasticsearch
The Alpakka Elasticsearch connector provides Akka Streams integration for Elasticsearch.
For more information about Elasticsearch, please visit the Elasticsearch documentation.
Project Info: Alpakka Elasticsearch | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-elasticsearch
1.0.2
|
JDK versions | OpenJDK 8 |
Scala versions | 2.12.7, 2.11.12, 2.13.0-M5 |
JPMS module name | akka.stream.alpakka.elasticsearch |
License | |
Readiness level |
Since 0.12, 2017-09-19
|
Home page | https://doc.akka.io/docs/alpakka/current/ |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "1.0.2"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-elasticsearch_2.12</artifactId> <version>1.0.2</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-elasticsearch_2.12', version: '1.0.2' }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version License com.fasterxml.jackson.core jackson-databind 2.9.8 The Apache Software License, Version 2.0 com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 io.spray spray-json_2.12 1.3.5 Apache 2 org.elasticsearch.client elasticsearch-rest-client 6.3.1 The Apache Software License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause - Dependency tree
com.fasterxml.jackson.core jackson-databind 2.9.8 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.9.0 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.9.8 The Apache Software License, Version 2.0 com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.22 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-protobuf_2.12 2.5.22 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.7 BSD 3-Clause io.spray spray-json_2.12 1.3.5 Apache 2 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.elasticsearch.client elasticsearch-rest-client 6.3.1 The Apache Software License, Version 2.0 commons-codec commons-codec 1.10 Apache License, Version 2.0 commons-logging commons-logging 1.1.3 The Apache Software License, Version 2.0 org.apache.httpcomponents httpasyncclient 4.1.2 Apache License, Version 2.0 org.apache.httpcomponents httpclient 4.5.2 Apache License, Version 2.0 org.apache.httpcomponents httpcore-nio 4.4.5 Apache License, Version 2.0 org.apache.httpcomponents httpcore 4.4.5 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause
Set up REST client
Sources, Flows and Sinks provided by this connector need a prepared org.elasticsearch.client.RestClient
to access to Elasticsearch.
- Scala
-
import org.apache.http.HttpHost import org.elasticsearch.client.RestClient implicit val client: RestClient = RestClient.builder(new HttpHost("localhost", 9201)).build()
- Java
-
import akka.stream.alpakka.elasticsearch.*; import akka.stream.alpakka.elasticsearch.javadsl.*; import org.elasticsearch.client.RestClient; import org.apache.http.HttpHost; client = RestClient.builder(new HttpHost("localhost", 9201)).build();
Elasticsearch as Source and Sink
Now we can stream messages from or to Elasticsearch by providing the RestClient
to the ElasticsearchSource ElasticsearchSource or the ElasticsearchSink. ElasticsearchSink.
- Scala
-
import spray.json._ import DefaultJsonProtocol._ case class Book(title: String) implicit val format: JsonFormat[Book] = jsonFormat1(Book)
- Java
-
public static class Book { public String title; public Book() {} public Book(String title) { this.title = title; } }
With typed source
Use ElasticsearchSource.typed
and ElasticsearchSink.create
to create source and sink. The data is converted to and from JSON by Spray JSON. The data is converted to and from JSON by Jackson’s ObjectMapper.
- Scala
-
val copy = ElasticsearchSource .typed[Book]( indexName = "source", typeName = "_doc", query = """{"match_all": {}}""" ) .map { message: ReadResult[Book] => WriteMessage.createIndexMessage(message.id, message.source) } .runWith( ElasticsearchSink.create[Book]( indexName, typeName = "_doc" ) )
- Java
-
ElasticsearchSourceSettings sourceSettings = ElasticsearchSourceSettings.create(); ElasticsearchWriteSettings sinkSettings = ElasticsearchWriteSettings.create(); Source<ReadResult<Book>, NotUsed> source = ElasticsearchSource.typed( "source", "_doc", "{\"match_all\": {}}", sourceSettings, client, Book.class); CompletionStage<Done> f1 = source .map(m -> WriteMessage.createIndexMessage(m.id(), m.source())) .runWith( ElasticsearchSink.create("sink2", "_doc", sinkSettings, client, new ObjectMapper()), materializer);
With JSON source
Use ElasticsearchSource.create
and ElasticsearchSink.create
to create source and sink.
- Scala
-
val copy = ElasticsearchSource .create( indexName = "source", typeName = "_doc", query = """{"match_all": {}}""" ) .map { message: ReadResult[spray.json.JsObject] => val book: Book = jsonReader[Book].read(message.source) WriteMessage.createIndexMessage(message.id, book) } .runWith( ElasticsearchSink.create[Book]( indexName, typeName = "_doc" ) )
- Java
-
ElasticsearchSourceSettings sourceSettings = ElasticsearchSourceSettings.create(); ElasticsearchWriteSettings sinkSettings = ElasticsearchWriteSettings.create(); Source<ReadResult<Map<String, Object>>, NotUsed> source = ElasticsearchSource.create("source", "_doc", "{\"match_all\": {}}", sourceSettings, client); CompletionStage<Done> f1 = source .map(m -> WriteMessage.createIndexMessage(m.id(), m.source())) .runWith( ElasticsearchSink.create("sink1", "_doc", sinkSettings, client, new ObjectMapper()), materializer);
Writing to Elasticsearch
In the above examples, WriteMessage
is used as the input to ElasticsearchSink
and ElasticsearchFlow
. This means requesting index
operation to Elasticsearch. It’s possible to request other operations using following message types:
Message factory | Description |
---|---|
WriteMessage.createIndexMessage | Create a new document. If id is specified and it already exists, do nothing. |
WriteMessage.createCreateMessage | Create a new document. If id already exists, the WriteResult will contain an error. |
WriteMessage.createUpdateMessage | Update an existing document. If there is no document with the specified id , do nothing. |
WriteMessage.createUpsertMessage | Update an existing document. If there is no document with the specified id , create a new document. |
WriteMessage.createDeleteMessage | Delete an existing document. If there is no document with the specified id , do nothing. |
- Scala
-
val requests = List[WriteMessage[Book, NotUsed]]( WriteMessage.createIndexMessage(id = "00001", source = Book("Book 1")), WriteMessage.createUpsertMessage(id = "00002", source = Book("Book 2")), WriteMessage.createUpsertMessage(id = "00003", source = Book("Book 3")), WriteMessage.createUpdateMessage(id = "00004", source = Book("Book 4")), WriteMessage.createCreateMessage(id = "00005", source = Book("Book 5")), WriteMessage.createDeleteMessage(id = "00002") ) val writeResults = Source(requests) .via( ElasticsearchFlow.create[Book]( indexName, "_doc" ) ) .runWith(Sink.seq)
- Java
-
// Create, update, upsert and delete documents in sink8/book List<WriteMessage<Book, NotUsed>> requests = Arrays.asList( WriteMessage.createIndexMessage("00001", new Book("Book 1")), WriteMessage.createUpsertMessage("00002", new Book("Book 2")), WriteMessage.createUpsertMessage("00003", new Book("Book 3")), WriteMessage.createUpdateMessage("00004", new Book("Book 4")), WriteMessage.createDeleteMessage("00002")); Source.from(requests) .via( ElasticsearchFlow.create( "sink8", "_doc", ElasticsearchWriteSettings.create(), client, new ObjectMapper())) .runWith(Sink.seq(), materializer) .toCompletableFuture() .get();
Source configuration
We can configure the source by ElasticsearchSourceSettings
.
- Scala
-
val sourceSettings = ElasticsearchSourceSettings() .withBufferSize(10) .withScrollDuration(5.minutes)
- Java
-
ElasticsearchSourceSettings sourceSettings = ElasticsearchSourceSettings.create().withBufferSize(10);
Parameter | Default | Description |
---|---|---|
bufferSize | 10 | ElasticsearchSource retrieves messages from Elasticsearch by scroll scan. This buffer size is used as the scroll size. |
includeDocumentVersion | false | Tell Elasticsearch to return the documents _version property with the search results. See Version and Optimistic Concurrenct Control to know about this property. |
scrollDuration | 5 min | ElasticsearchSource retrieves messages from Elasticsearch by scroll scan. This parameter is used as a scroll value. See Time units for supported units. |
Sink and flow configuration
Sinks and flows are configured with ElasticsearchWriteSettings
.
- Scala
-
val sinkSettings = ElasticsearchWriteSettings() .withBufferSize(10) .withVersionType("internal") .withRetryLogic(RetryAtFixedRate(maxRetries = 5, retryInterval = 1.second))
- Java
-
ElasticsearchWriteSettings settings = ElasticsearchWriteSettings.create() .withBufferSize(10) .withVersionType("internal") .withRetryLogic(RetryAtFixedRate.create(5, Duration.ofSeconds(1)));
Parameter | Default | Description |
---|---|---|
bufferSize | 10 | Flow and Sink batch messages to bulk requests when back-pressure applies. |
versionType | None | If set, ElasticsearchSink uses the chosen versionType to index documents. See Version types for accepted settings. |
retryLogic | No retries | See below |
A bulk request might fail partially for some reason. To retry failed writes to Elasticsearch, a RetryLogic
can be specified. The provided implementation is RetryAtFixedRate
.
If using retries, you will receive messages out of order downstream in cases when Elasticsearch returns an error on some of the documents in a bulk request.
Parameter | Description |
---|---|
maxRetries | The stage fails, if it gets this number of consecutive failures. |
retryInterval | Failing writes are retried after this duration. |
Elasticsearch as Flow
You can also build flow stages with ElasticsearchFlow. ElasticsearchFlow. The API is similar to creating Sinks.
- Scala
-
val copy = ElasticsearchSource .typed[Book]( indexName = "source", typeName = "_doc", query = """{"match_all": {}}""" ) .map { message: ReadResult[Book] => WriteMessage.createIndexMessage(message.id, message.source) } .via( ElasticsearchFlow.create[Book]( indexName = indexName, typeName = "_doc" ) ) .runWith(Sink.seq)
- Java
-
CompletionStage<List<WriteResult<Book, NotUsed>>> f1 = ElasticsearchSource.typed( "source", "_doc", "{\"match_all\": {}}", ElasticsearchSourceSettings.create().withBufferSize(5), client, Book.class) .map(m -> WriteMessage.createIndexMessage(m.id(), m.source())) .via( ElasticsearchFlow.create( "sink3", "_doc", ElasticsearchWriteSettings.create().withBufferSize(5), client, new ObjectMapper())) .runWith(Sink.seq(), materializer);
Passing data through ElasticsearchFlow
When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to Elastic.
- Scala
-
// We're going to pretend we got messages from kafka. // After we've written them to Elastic, we want // to commit the offset to Kafka case class KafkaOffset(offset: Int) case class KafkaMessage(book: Book, offset: KafkaOffset) val messagesFromKafka = List( KafkaMessage(Book("Book 1"), KafkaOffset(0)), KafkaMessage(Book("Book 2"), KafkaOffset(1)), KafkaMessage(Book("Book 3"), KafkaOffset(2)) ) var committedOffsets = Vector[KafkaOffset]() def commitToKafka(offset: KafkaOffset): Unit = committedOffsets = committedOffsets :+ offset val indexName = "sink6" val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka .map { kafkaMessage: KafkaMessage => val book = kafkaMessage.book val id = book.title println("title: " + book.title) // Transform message so that we can write to elastic WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset) } .via( // write to elastic ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset]( indexName = indexName, typeName = "_doc" ) ) .map { result => if (!result.success) throw new Exception("Failed to write message to elastic") // Commit to kafka commitToKafka(result.message.passThrough) } .runWith(Sink.ignore) kafkaToEs.futureValue shouldBe Done
- Java
-
// We're going to pretend we got messages from kafka. // After we've written them to Elastic, we want // to commit the offset to Kafka List<KafkaMessage> messagesFromKafka = Arrays.asList( new KafkaMessage(new Book("Book 1"), new KafkaOffset(0)), new KafkaMessage(new Book("Book 2"), new KafkaOffset(1)), new KafkaMessage(new Book("Book 3"), new KafkaOffset(2))); final KafkaCommitter kafkaCommitter = new KafkaCommitter(); CompletionStage<Done> kafkaToEs = Source.from(messagesFromKafka) // Assume we get this from Kafka .map( kafkaMessage -> { Book book = kafkaMessage.book; String id = book.title; // Transform message so that we can write to elastic return WriteMessage.createIndexMessage(id, book) .withPassThrough(kafkaMessage.offset); }) .via( // write to elastic ElasticsearchFlow.createWithPassThrough( "sink6", "_doc", ElasticsearchWriteSettings.create().withBufferSize(5), client, new ObjectMapper())) .map( result -> { if (!result.success()) throw new RuntimeException("Failed to write message to elastic"); // Commit to kafka kafkaCommitter.commit(result.message().passThrough()); return NotUsed.getInstance(); }) .runWith(Sink.ignore(), materializer);
Specifying custom index-name for every document
When working with index-patterns using wildcards, you might need to specify a custom index-name for each document:
- Scala
-
val customIndexName = "custom-index" val writeCustomIndex = ElasticsearchSource .typed[Book]( indexName = "source", typeName = "_doc", query = """{"match_all": {}}""" ) .map { message: ReadResult[Book] => WriteMessage .createIndexMessage(message.id, message.source) .withIndexName(customIndexName) // Setting the index-name to use for this document } .runWith( ElasticsearchSink.create[Book]( indexName = "this-is-not-the-index-we-are-using", typeName = "_doc" ) )
- Java
-
WriteMessage msg = WriteMessage.createIndexMessage(doc).withIndexName("my-index");
Specifying custom metadata for every document
In some cases you might want to specify custom metadata per document you are inserting, for example a pipeline
, this can be done like so:
- Scala
-
val msg = WriteMessage .createIndexMessage(doc) .withCustomMetadata(Map("pipeline" -> "myPipeline"))
- Java
-
Map<String, String> metadata = new HashMap<>(); metadata.put("pipeline", "myPipeline"); WriteMessage msgWithMetadata = WriteMessage.createIndexMessage(doc).withCustomMetadata(metadata);
More custom searching
The easiest way of using ElasticSearch-source, is to just specify the query-param. Sometimes you need more control, like specifying which fields to return and so on. In such cases you can instead use ‘searchParams’ instead:
- Scala
-
case class TestDoc(id: String, a: String, b: Option[String], c: String) // Search for docs and ask elastic to only return some fields val readWithSearchParameters = ElasticsearchSource .typed[TestDoc](indexName, Some(typeName), searchParams = Map( "query" -> """ {"match_all": {}} """, "_source" -> """ ["id", "a", "c"] """ ), ElasticsearchSourceSettings()) .map { message => message.source } .runWith(Sink.seq)
- Java
-
public static class TestDoc { public String id; public String a; public String b; public String c; } // Search for docs and ask elastic to only return some fields Map<String, String> searchParams = new HashMap<>(); searchParams.put("query", "{\"match_all\": {}}"); searchParams.put("_source", "[\"id\", \"a\", \"c\"]"); List<TestDoc> result = ElasticsearchSource.<TestDoc>typed( indexName, typeName, searchParams, // <-- Using searchParams ElasticsearchSourceSettings.create(), client, TestDoc.class, new ObjectMapper()) .map( o -> { return o.source(); // These documents will only have property id, a and c (not b) }) .runWith(Sink.seq(), materializer) .toCompletableFuture() .get();