Elasticsearch
Example: Index all data from an RDBMS table into Elasticsearch
- Instantiate a Slick database session using the config parameters defined in key
slick-h2-mem
and mount closing it on shutdown of the Actor System (1) - Scala only: Slick definition of the MOVIE table (2)
- Class that holds the Movie data (3)
- Instantiate Elastic REST client (4)
- Scala: Instantiate the Spray json format that converts the
Movie
case class to json (5) - Java: Instantiate the Jackson Object mapper that converts the
Movie
class to json (5) - Construct the Slick
Source
for the H2 table and query all data in the table (6) - Scala only: Map each tuple into a
Movie
case class instance (7) - The first argument of the
IncomingMessage
is the id of the document. Replace withNone
if you would Elastic to generate one (8) - Prepare the Elastic
Sink
that the data needs to be drained to (9) - Close the Elastic client upon completion of indexing the data (10)
- Scala
-
import akka.Done import akka.stream.alpakka.elasticsearch.WriteMessage._ import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.client.RestClient import akka.stream.alpakka.slick.javadsl.SlickSession import akka.stream.alpakka.slick.scaladsl.Slick import spray.json.DefaultJsonProtocol._ import spray.json.JsonFormat import scala.concurrent.Future import scala.concurrent.duration._ implicit val session = SlickSession.forConfig("slick-h2-mem") // (1) actorSystem.registerOnTermination(session.close()) import session.profile.api._ class Movies(tag: Tag) extends Table[(Int, String, String, Double)](tag, "MOVIE") { // (2) def id = column[Int]("ID") def title = column[String]("TITLE") def genre = column[String]("GENRE") def gross = column[Double]("GROSS") override def * = (id, title, genre, gross) } case class Movie(id: Int, title: String, genre: String, gross: Double) // (3) implicit val elasticSearchClient: RestClient = RestClient.builder(new HttpHost("localhost", 9201)).build() // (4) implicit val format: JsonFormat[Movie] = jsonFormat4(Movie) // (5) val done: Future[Done] = Slick .source(TableQuery[Movies].result) // (6) .map { // (7) case (id, genre, title, gross) => Movie(id, genre, title, gross) } .map(movie => createIndexMessage(movie.id.toString, movie)) // (8) .runWith(ElasticsearchSink.create[Movie]("movie", "_doc")) // (9) done.onComplete { case _ => elasticSearchClient.close() // (10) }
- Java
-
import akka.Done; import akka.actor.ActorSystem; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.alpakka.elasticsearch.ElasticsearchWriteSettings; import akka.stream.alpakka.elasticsearch.WriteMessage; import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSink; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import akka.stream.alpakka.slick.javadsl.Slick; import akka.stream.alpakka.slick.javadsl.SlickRow; import akka.stream.alpakka.slick.javadsl.SlickSession; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.concurrent.CompletionStage; static class Movie { // (3) public final int id; public final String title; public final String genre; public final double gross; @JsonCreator public Movie( @JsonProperty("id") int id, @JsonProperty("title") String title, @JsonProperty("genre") String genre, @JsonProperty("gross") double gross) { this.id = id; this.title = title; this.genre = genre; this.gross = gross; } } ActorSystem system = ActorSystem.create(); Materializer materializer = ActorMaterializer.create(system); SlickSession session = SlickSession.forConfig("slick-h2-mem"); // (1) system.registerOnTermination(session::close); RestClient elasticSearchClient = RestClient.builder(new HttpHost("localhost", 9201)).build(); // (4) final ObjectMapper objectToJsonMapper = new ObjectMapper(); // (5) final CompletionStage<Done> done = Slick.source( // (6) session, "SELECT * FROM MOVIE", (SlickRow row) -> new Movie(row.nextInt(), row.nextString(), row.nextString(), row.nextDouble())) .map(movie -> WriteMessage.createIndexMessage(String.valueOf(movie.id), movie)) // (8) .runWith( ElasticsearchSink.create( // (9) "movie", "boxoffice", ElasticsearchWriteSettings.Default(), elasticSearchClient, objectToJsonMapper), materializer); done.thenRunAsync( () -> { try { elasticSearchClient.close(); // (10) } catch (IOException ignored) { ignored.printStackTrace(); } }, system.dispatcher())
Example: Read from a Kafka topic and publish to Elasticsearch
- Configure Kafka consumer (1)
- Data class mapped to Elasticsearch (2)
- Spray JSONJackson conversion for the data class (3)
- Elasticsearch client setup (4)
- Kafka consumer with committing support (5)
- Use
FlowWithContext
to focus onConsumerRecord
(6) - Parse message from Kafka to
Movie
and create Elasticsearch write message (7) - Use
createWithContext
to use an Elasticsearch flow with context-support (so it passes through the Kafka committ offset) (8) - React on write errors (9)
- Make the context visible again and just keep the committable offset (10)
- Let the
Committer.sink
aggregate commits to batches and commit to Kafka (11) - Combine consumer control and stream completion into
DrainingControl
(12)
- Scala
-
import akka.actor.ActorSystem import akka.kafka._ import akka.kafka.scaladsl.{Committer, Consumer, Producer} import akka.stream.alpakka.elasticsearch.WriteMessage import akka.stream.alpakka.elasticsearch.scaladsl.{ElasticsearchFlow, ElasticsearchSource} import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.{ActorMaterializer, Materializer} import akka.{Done, NotUsed} import org.apache.http.HttpHost import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization._ import org.elasticsearch.client.RestClient import org.slf4j.LoggerFactory import org.testcontainers.containers.KafkaContainer import org.testcontainers.elasticsearch.ElasticsearchContainer import spray.json.DefaultJsonProtocol._ import spray.json._ import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} // configure Kafka consumer (1) val kafkaConsumerSettings = ConsumerSettings(actorSystem, new IntegerDeserializer, new StringDeserializer) .withBootstrapServers(kafkaBootstrapServers) .withGroupId(groupId) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .withStopTimeout(5.seconds) // Type in Elasticsearch (2) case class Movie(id: Int, title: String) // Spray JSON conversion setup (3) implicit val movieFormat: JsonFormat[Movie] = jsonFormat2(Movie) // Elasticsearch client setup (4) implicit val elasticsearchClient: RestClient = RestClient .builder(HttpHost.create(elasticsearchAddress)) .build() val indexName = "movies" val control: Consumer.DrainingControl[Done] = Consumer .committableSource(kafkaConsumerSettings, Subscriptions.topics(topic)) // (5) .asSourceWithContext(_.committableOffset) // (6) .map(_.record) .map { consumerRecord => // (7) val movie = consumerRecord.value().parseJson.convertTo[Movie] WriteMessage.createUpsertMessage(movie.id.toString, movie) } .via(ElasticsearchFlow.createWithContext(indexName, "_doc")) // (8) .map { writeResult => // (9) writeResult.error.foreach { errorJson => throw new RuntimeException(s"Elasticsearch update failed ${writeResult.errorReason.getOrElse(errorJson)}") } NotUsed } .asSource // (10) .map { case (_, committableOffset) => committableOffset } .toMat(Committer.sink(CommitterSettings(actorSystem)))(Keep.both) // (11) .mapMaterializedValue(Consumer.DrainingControl.apply) // (12) .run()
- Java
-
import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.actor.Terminated; import akka.kafka.CommitterSettings; import akka.kafka.ConsumerSettings; import akka.kafka.ProducerSettings; import akka.kafka.Subscriptions; import akka.kafka.javadsl.Committer; import akka.kafka.javadsl.Consumer; import akka.kafka.javadsl.Producer; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.alpakka.elasticsearch.ElasticsearchSourceSettings; import akka.stream.alpakka.elasticsearch.ElasticsearchWriteSettings; import akka.stream.alpakka.elasticsearch.WriteMessage; import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchFlow; import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSource; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.apache.http.HttpHost; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.elasticsearch.client.RestClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.elasticsearch.ElasticsearchContainer; import java.io.IOException; import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; // configure Kafka consumer (1) ConsumerSettings<Integer, String> kafkaConsumerSettings = ConsumerSettings.create(actorSystem, new IntegerDeserializer(), new StringDeserializer()) .withBootstrapServers(kafkaBootstrapServers) .withGroupId(groupId) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .withStopTimeout(Duration.ofSeconds(5)); // Type in Elasticsearch (2) static class Movie { public final int id; public final String title; @JsonCreator public Movie(@JsonProperty("id") int id, @JsonProperty("title") String title) { this.id = id; this.title = title; } @Override public String toString() { return "Movie(" + id + ", title=" + title + ")"; } } // Jackson conversion setup (3) final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule()); final ObjectWriter movieWriter = mapper.writerFor(Movie.class); final ObjectReader movieReader = mapper.readerFor(Movie.class); // Elasticsearch client setup (4) elasticsearchClient = RestClient.builder(HttpHost.create(elasticsearchAddress)).build(); Consumer.DrainingControl<Done> control = Consumer.committableSource(kafkaConsumerSettings, Subscriptions.topics(topic)) // (5) .asSourceWithContext(cm -> cm.committableOffset()) // (6) .map(cm -> cm.record()) .map( consumerRecord -> { // (7) Movie movie = movieReader.readValue(consumerRecord.value()); return WriteMessage.createUpsertMessage(String.valueOf(movie.id), movie); }) .via( ElasticsearchFlow.createWithContext( indexName, "_doc", ElasticsearchWriteSettings.create(), elasticsearchClient, mapper)) // (8) .map( writeResult -> { // (9) writeResult .getError() .ifPresent( errorJson -> { throw new RuntimeException( "Elasticsearch update failed " + writeResult.getErrorReason().orElse(errorJson)); }); return NotUsed.notUsed(); }) .asSource() // (10) .map(pair -> pair.second()) .toMat(Committer.sink(CommitterSettings.create(actorSystem)), Keep.both()) // (11) .mapMaterializedValue(Consumer::createDrainingControl) // (12) .run(materializer);
Running the example code
This example is contained in a stand-alone runnable main, it can be run from sbt
like this:
- Scala
-
sbt > doc-examples/run