Elasticsearch

Example: Index all data from an RDBMS table into Elasticsearch

  • Instantiate a Slick database session using the config parameters defined in key slick-h2-mem and mount closing it on shutdown of the Actor System (1)
  • Scala only: Slick definition of the MOVIE table (2)
  • Class that holds the Movie data (3)
  • Instantiate Elastic REST client (4)
  • Scala: Instantiate the Spray json format that converts the Movie case class to json (5)
  • Java: Instantiate the Jackson Object mapper that converts the Movie class to json (5)
  • Construct the Slick Source for the H2 table and query all data in the table (6)
  • Scala only: Map each tuple into a Movie case class instance (7)
  • The first argument of the IncomingMessage is the id of the document. Replace with None if you would Elastic to generate one (8)
  • Prepare the Elastic Sink that the data needs to be drained to (9)
  • Close the Elastic client upon completion of indexing the data (10)
Scala
import akka.Done
import akka.stream.alpakka.elasticsearch.WriteMessage._
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.RestClient
import akka.stream.alpakka.slick.javadsl.SlickSession
import akka.stream.alpakka.slick.scaladsl.Slick
import spray.json.DefaultJsonProtocol._
import spray.json.JsonFormat

import scala.concurrent.Future
import scala.concurrent.duration._

implicit val session = SlickSession.forConfig("slick-h2-mem")                         // (1)
actorSystem.registerOnTermination(session.close())

import session.profile.api._
class Movies(tag: Tag) extends Table[(Int, String, String, Double)](tag, "MOVIE") {   // (2)
  def id = column[Int]("ID")
  def title = column[String]("TITLE")
  def genre = column[String]("GENRE")
  def gross = column[Double]("GROSS")

  override def * = (id, title, genre, gross)
}

case class Movie(id: Int, title: String, genre: String, gross: Double)                // (3)

implicit val elasticSearchClient: RestClient =
  RestClient.builder(new HttpHost("localhost", 9201)).build()                         // (4)
implicit val format: JsonFormat[Movie] = jsonFormat4(Movie)                           // (5)

val done: Future[Done] =
  Slick
    .source(TableQuery[Movies].result)                                                // (6)
    .map {                                                                            // (7)
      case (id, genre, title, gross) => Movie(id, genre, title, gross)
    }
    .map(movie => createIndexMessage(movie.id.toString, movie))                       // (8)
    .runWith(ElasticsearchSink.create[Movie]("movie", "_doc"))                        // (9)

done.onComplete {
  case _ =>
    elasticSearchClient.close()                                                       // (10)
}
Java
import akka.Done;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;

import akka.stream.alpakka.elasticsearch.ElasticsearchWriteSettings;
import akka.stream.alpakka.elasticsearch.WriteMessage;
import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;

import akka.stream.alpakka.slick.javadsl.Slick;
import akka.stream.alpakka.slick.javadsl.SlickRow;
import akka.stream.alpakka.slick.javadsl.SlickSession;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.util.concurrent.CompletionStage;


static class Movie { // (3)
  public final int id;
  public final String title;
  public final String genre;
  public final double gross;

  @JsonCreator
  public Movie(
      @JsonProperty("id") int id,
      @JsonProperty("title") String title,
      @JsonProperty("genre") String genre,
      @JsonProperty("gross") double gross) {
    this.id = id;
    this.title = title;
    this.genre = genre;
    this.gross = gross;
  }
}

  ActorSystem system = ActorSystem.create();
  Materializer materializer = ActorMaterializer.create(system);

  SlickSession session = SlickSession.forConfig("slick-h2-mem"); // (1)
  system.registerOnTermination(session::close);

  RestClient elasticSearchClient =
      RestClient.builder(new HttpHost("localhost", 9201)).build(); // (4)

  final ObjectMapper objectToJsonMapper = new ObjectMapper(); // (5)

  final CompletionStage<Done> done =
      Slick.source( // (6)
              session,
              "SELECT * FROM MOVIE",
              (SlickRow row) ->
                  new Movie(row.nextInt(), row.nextString(), row.nextString(), row.nextDouble()))
          .map(movie -> WriteMessage.createIndexMessage(String.valueOf(movie.id), movie)) // (8)
          .runWith(
              ElasticsearchSink.create( // (9)
                  "movie",
                  "boxoffice",
                  ElasticsearchWriteSettings.Default(),
                  elasticSearchClient,
                  objectToJsonMapper),
              materializer);

  done.thenRunAsync(
          () -> {
            try {
              elasticSearchClient.close(); // (10)
            } catch (IOException ignored) {
              ignored.printStackTrace();
            }
          },
          system.dispatcher())

Example: Read from a Kafka topic and publish to Elasticsearch

  • Configure Kafka consumer (1)
  • Data class mapped to Elasticsearch (2)
  • Spray JSONJackson conversion for the data class (3)
  • Elasticsearch client setup (4)
  • Kafka consumer with committing support (5)
  • Use FlowWithContext to focus on ConsumerRecord (6)
  • Parse message from Kafka to Movie and create Elasticsearch write message (7)
  • Use createWithContext to use an Elasticsearch flow with context-support (so it passes through the Kafka committ offset) (8)
  • React on write errors (9)
  • Make the context visible again and just keep the committable offset (10)
  • Let the Committer.sink aggregate commits to batches and commit to Kafka (11)
  • Combine consumer control and stream completion into DrainingControl (12)
Scala
import akka.actor.ActorSystem
import akka.kafka._
import akka.kafka.scaladsl.{Committer, Consumer, Producer}
import akka.stream.alpakka.elasticsearch.WriteMessage
import akka.stream.alpakka.elasticsearch.scaladsl.{ElasticsearchFlow, ElasticsearchSource}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Materializer}
import akka.{Done, NotUsed}
import org.apache.http.HttpHost
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization._
import org.elasticsearch.client.RestClient
import org.slf4j.LoggerFactory
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.elasticsearch.ElasticsearchContainer
import spray.json.DefaultJsonProtocol._
import spray.json._

import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

// configure Kafka consumer (1)
val kafkaConsumerSettings = ConsumerSettings(actorSystem, new IntegerDeserializer, new StringDeserializer)
  .withBootstrapServers(kafkaBootstrapServers)
  .withGroupId(groupId)
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  .withStopTimeout(5.seconds)

// Type in Elasticsearch (2)
case class Movie(id: Int, title: String)

// Spray JSON conversion setup (3)
implicit val movieFormat: JsonFormat[Movie] = jsonFormat2(Movie)

// Elasticsearch client setup (4)
implicit val elasticsearchClient: RestClient =
  RestClient
    .builder(HttpHost.create(elasticsearchAddress))
    .build()

val indexName = "movies"

val control: Consumer.DrainingControl[Done] = Consumer
  .committableSource(kafkaConsumerSettings, Subscriptions.topics(topic)) // (5)
  .asSourceWithContext(_.committableOffset) // (6)
  .map(_.record)
  .map { consumerRecord => // (7)
    val movie = consumerRecord.value().parseJson.convertTo[Movie]
    WriteMessage.createUpsertMessage(movie.id.toString, movie)
  }
  .via(ElasticsearchFlow.createWithContext(indexName, "_doc")) // (8)
  .map { writeResult => // (9)
    writeResult.error.foreach { errorJson =>
      throw new RuntimeException(s"Elasticsearch update failed ${writeResult.errorReason.getOrElse(errorJson)}")
    }
    NotUsed
  }
  .asSource // (10)
  .map {
    case (_, committableOffset) =>
      committableOffset
  }
  .toMat(Committer.sink(CommitterSettings(actorSystem)))(Keep.both) // (11)
  .mapMaterializedValue(Consumer.DrainingControl.apply) // (12)
  .run()
Java
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.actor.Terminated;
import akka.kafka.CommitterSettings;
import akka.kafka.ConsumerSettings;
import akka.kafka.ProducerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Committer;
import akka.kafka.javadsl.Consumer;
import akka.kafka.javadsl.Producer;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.alpakka.elasticsearch.ElasticsearchSourceSettings;
import akka.stream.alpakka.elasticsearch.ElasticsearchWriteSettings;
import akka.stream.alpakka.elasticsearch.WriteMessage;
import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchFlow;
import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSource;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.http.HttpHost;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.elasticsearch.client.RestClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

// configure Kafka consumer (1)
ConsumerSettings<Integer, String> kafkaConsumerSettings =
    ConsumerSettings.create(actorSystem, new IntegerDeserializer(), new StringDeserializer())
        .withBootstrapServers(kafkaBootstrapServers)
        .withGroupId(groupId)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        .withStopTimeout(Duration.ofSeconds(5));

// Type in Elasticsearch (2)
static class Movie {
  public final int id;
  public final String title;

  @JsonCreator
  public Movie(@JsonProperty("id") int id, @JsonProperty("title") String title) {
    this.id = id;
    this.title = title;
  }

  @Override
  public String toString() {
    return "Movie(" + id + ", title=" + title + ")";
  }
}

// Jackson conversion setup (3)
final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule());
final ObjectWriter movieWriter = mapper.writerFor(Movie.class);
final ObjectReader movieReader = mapper.readerFor(Movie.class);


  // Elasticsearch client setup (4)
  elasticsearchClient = RestClient.builder(HttpHost.create(elasticsearchAddress)).build();

Consumer.DrainingControl<Done> control =
    Consumer.committableSource(kafkaConsumerSettings, Subscriptions.topics(topic)) // (5)
        .asSourceWithContext(cm -> cm.committableOffset()) // (6)
        .map(cm -> cm.record())
        .map(
            consumerRecord -> { // (7)
              Movie movie = movieReader.readValue(consumerRecord.value());
              return WriteMessage.createUpsertMessage(String.valueOf(movie.id), movie);
            })
        .via(
            ElasticsearchFlow.createWithContext(
                indexName,
                "_doc",
                ElasticsearchWriteSettings.create(),
                elasticsearchClient,
                mapper)) // (8)
        .map(
            writeResult -> { // (9)
              writeResult
                  .getError()
                  .ifPresent(
                      errorJson -> {
                        throw new RuntimeException(
                            "Elasticsearch update failed "
                                + writeResult.getErrorReason().orElse(errorJson));
                      });
              return NotUsed.notUsed();
            })
        .asSource() // (10)
        .map(pair -> pair.second())
        .toMat(Committer.sink(CommitterSettings.create(actorSystem)), Keep.both()) // (11)
        .mapMaterializedValue(Consumer::createDrainingControl) // (12)
        .run(materializer);

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> doc-examples/run
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.