Elasticsearch

Example: Index all data from an RDBMS table into Elasticsearch

  • Instantiate a Slick database session using the config parameters defined in key slick-h2-mem and mount closing it on shutdown of the Actor System (1)
  • Scala only: Slick definition of the MOVIE table (2)
  • Class that holds the Movie data (3)
  • Instantiate Elastic REST client (4)
  • Scala: Instantiate the Spray json format that converts the Movie case class to json (5)
  • Java: Instantiate the Jackson Object mapper that converts the Movie class to json (5)
  • Construct the Slick Source for the H2 table and query all data in the table (6)
  • Scala only: Map each tuple into a Movie case class instance (7)
  • The first argument of the IncomingMessage is the id of the document. Replace with None if you would Elastic to generate one (8)
  • Prepare the Elastic Sink that the data needs to be drained to (9)
  • Close the Elastic client upon completion of indexing the data (10)
Scala
import akka.Done
import akka.stream.alpakka.elasticsearch.WriteMessage._
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.RestClient
import akka.stream.alpakka.slick.javadsl.SlickSession
import akka.stream.alpakka.slick.scaladsl.Slick
import spray.json.DefaultJsonProtocol._
import spray.json.JsonFormat

import scala.concurrent.Future
import scala.concurrent.duration._

implicit val session = SlickSession.forConfig("slick-h2-mem")                         // (1)
actorSystem.registerOnTermination(session.close())

import session.profile.api._
class Movies(tag: Tag) extends Table[(Int, String, String, Double)](tag, "MOVIE") {   // (2)
  def id = column[Int]("ID")
  def title = column[String]("TITLE")
  def genre = column[String]("GENRE")
  def gross = column[Double]("GROSS")

  override def * = (id, title, genre, gross)
}

case class Movie(id: Int, title: String, genre: String, gross: Double)                // (3)

implicit val elasticSearchClient: RestClient =
  RestClient.builder(new HttpHost("localhost", 9201)).build()                         // (4)
implicit val format: JsonFormat[Movie] = jsonFormat4(Movie)                           // (5)

val done: Future[Done] =
  Slick
    .source(TableQuery[Movies].result)                                                // (6)
    .map {                                                                            // (7)
      case (id, genre, title, gross) => Movie(id, genre, title, gross)
    }
    .map(movie => createIndexMessage(movie.id.toString, movie))                       // (8)
    .runWith(ElasticsearchSink.create[Movie]("movie", "_doc"))                        // (9)

done.onComplete {
  case _ =>
    elasticSearchClient.close()                                                       // (10)
}
Java
import akka.Done;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;

import akka.stream.alpakka.elasticsearch.ElasticsearchWriteSettings;
import akka.stream.alpakka.elasticsearch.WriteMessage;
import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;

import akka.stream.alpakka.slick.javadsl.Slick;
import akka.stream.alpakka.slick.javadsl.SlickRow;
import akka.stream.alpakka.slick.javadsl.SlickSession;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.util.concurrent.CompletionStage;


static class Movie { // (3)
  public final int id;
  public final String title;
  public final String genre;
  public final double gross;

  @JsonCreator
  public Movie(
      @JsonProperty("id") int id,
      @JsonProperty("title") String title,
      @JsonProperty("genre") String genre,
      @JsonProperty("gross") double gross) {
    this.id = id;
    this.title = title;
    this.genre = genre;
    this.gross = gross;
  }
}

  ActorSystem system = ActorSystem.create();
  Materializer materializer = ActorMaterializer.create(system);

  SlickSession session = SlickSession.forConfig("slick-h2-mem"); // (1)
  system.registerOnTermination(session::close);

  RestClient elasticSearchClient =
      RestClient.builder(new HttpHost("localhost", 9201)).build(); // (4)

  final ObjectMapper objectToJsonMapper = new ObjectMapper(); // (5)

  final CompletionStage<Done> done =
      Slick.source( // (6)
              session,
              "SELECT * FROM MOVIE",
              (SlickRow row) ->
                  new Movie(row.nextInt(), row.nextString(), row.nextString(), row.nextDouble()))
          .map(movie -> WriteMessage.createIndexMessage(String.valueOf(movie.id), movie)) // (8)
          .runWith(
              ElasticsearchSink.create( // (9)
                  "movie",
                  "boxoffice",
                  ElasticsearchWriteSettings.Default(),
                  elasticSearchClient,
                  objectToJsonMapper),
              materializer);

  done.thenRunAsync(
          () -> {
            try {
              elasticSearchClient.close(); // (10)
            } catch (IOException ignored) {
              ignored.printStackTrace();
            }
          },
          system.dispatcher())

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> doc-examples/run
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.