Elasticsearch
Example: Index all data from an RDBMS table into Elasticsearch
- Instantiate a Slick database session using the config parameters defined in key
slick-h2-mem
and mount closing it on shutdown of the Actor System (1) - Scala only: Slick definition of the MOVIE table (2)
- Class that holds the Movie data (3)
- Instantiate Elastic REST client (4)
- Scala: Instantiate the Spray json format that converts the
Movie
case class to json (5) - Java: Instantiate the Jackson Object mapper that converts the
Movie
class to json (5) - Construct the Slick
Source
for the H2 table and query all data in the table (6) - Scala only: Map each tuple into a
Movie
case class instance (7) - The first argument of the
IncomingMessage
is the id of the document. Replace withNone
if you would Elastic to generate one (8) - Prepare the Elastic
Sink
that the data needs to be drained to (9) - Close the Elastic client upon completion of indexing the data (10)
- Scala
-
import akka.Done import akka.stream.alpakka.elasticsearch.WriteMessage._ import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.client.RestClient import akka.stream.alpakka.slick.javadsl.SlickSession import akka.stream.alpakka.slick.scaladsl.Slick import spray.json.DefaultJsonProtocol._ import spray.json.JsonFormat import scala.concurrent.Future import scala.concurrent.duration._ implicit val session = SlickSession.forConfig("slick-h2-mem") // (1) actorSystem.registerOnTermination(session.close()) import session.profile.api._ class Movies(tag: Tag) extends Table[(Int, String, String, Double)](tag, "MOVIE") { // (2) def id = column[Int]("ID") def title = column[String]("TITLE") def genre = column[String]("GENRE") def gross = column[Double]("GROSS") override def * = (id, title, genre, gross) } case class Movie(id: Int, title: String, genre: String, gross: Double) // (3) implicit val elasticSearchClient: RestClient = RestClient.builder(new HttpHost("localhost", 9201)).build() // (4) implicit val format: JsonFormat[Movie] = jsonFormat4(Movie) // (5) val done: Future[Done] = Slick .source(TableQuery[Movies].result) // (6) .map { // (7) case (id, genre, title, gross) => Movie(id, genre, title, gross) } .map(movie => createIndexMessage(movie.id.toString, movie)) // (8) .runWith(ElasticsearchSink.create[Movie]("movie", "_doc")) // (9) done.onComplete { case _ => elasticSearchClient.close() // (10) }
- Java
-
import akka.Done; import akka.actor.ActorSystem; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.alpakka.elasticsearch.ElasticsearchWriteSettings; import akka.stream.alpakka.elasticsearch.WriteMessage; import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSink; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import akka.stream.alpakka.slick.javadsl.Slick; import akka.stream.alpakka.slick.javadsl.SlickRow; import akka.stream.alpakka.slick.javadsl.SlickSession; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.concurrent.CompletionStage; static class Movie { // (3) public final int id; public final String title; public final String genre; public final double gross; @JsonCreator public Movie( @JsonProperty("id") int id, @JsonProperty("title") String title, @JsonProperty("genre") String genre, @JsonProperty("gross") double gross) { this.id = id; this.title = title; this.genre = genre; this.gross = gross; } } ActorSystem system = ActorSystem.create(); Materializer materializer = ActorMaterializer.create(system); SlickSession session = SlickSession.forConfig("slick-h2-mem"); // (1) system.registerOnTermination(session::close); RestClient elasticSearchClient = RestClient.builder(new HttpHost("localhost", 9201)).build(); // (4) final ObjectMapper objectToJsonMapper = new ObjectMapper(); // (5) final CompletionStage<Done> done = Slick.source( // (6) session, "SELECT * FROM MOVIE", (SlickRow row) -> new Movie(row.nextInt(), row.nextString(), row.nextString(), row.nextDouble())) .map(movie -> WriteMessage.createIndexMessage(String.valueOf(movie.id), movie)) // (8) .runWith( ElasticsearchSink.create( // (9) "movie", "boxoffice", ElasticsearchWriteSettings.Default(), elasticSearchClient, objectToJsonMapper), materializer); done.thenRunAsync( () -> { try { elasticSearchClient.close(); // (10) } catch (IOException ignored) { ignored.printStackTrace(); } }, system.dispatcher())
Running the example code
This example is contained in a stand-alone runnable main, it can be run from sbt
like this:
- Scala
-
sbt > doc-examples/run