Apache Solr

The Solr connector provides Akka Stream sources and sinks for Solr.

For more information about Solr please visit the Solr documentation.

Project Info: Alpakka Solr
Artifact
com.lightbend.akka
akka-stream-alpakka-solr
1.0-M2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12
JPMS module nameakka.stream.alpakka.solr
License
Readiness level
Community-driven
Since 0.17, 2018-02-19
Home pagehttps://doc.akka.io/docs/alpakka/current/
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-solr" % "1.0-M2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-solr_2.12</artifactId>
  <version>1.0-M2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-solr_2.12', version: '1.0-M2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.typesafe.akkaakka-stream_2.122.5.19Apache License, Version 2.0
org.apache.solrsolr-solrj7.4.0Apache 2
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.typesafe.akka    akka-stream_2.12    2.5.19    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.19    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.19    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.6    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.apache.solr    solr-solrj    7.4.0    Apache 2
    commons-io    commons-io    2.5    Apache License, Version 2.0
    org.apache.commons    commons-math3    3.6.1    Apache License, Version 2.0
    org.apache.httpcomponents    httpclient    4.5.3    Apache License, Version 2.0
    org.apache.httpcomponents    httpcore    4.4.6    Apache License, Version 2.0
    org.apache.httpcomponents    httpmime    4.5.3    Apache License, Version 2.0
    org.apache.zookeeper    zookeeper    3.4.11    The Apache Software License, Version 2.0
    org.codehaus.woodstox    stax2-api    3.1.4    The BSD License
    org.codehaus.woodstox    woodstox-core-asl    4.4.1    The Apache Software License, Version 2.0
    org.noggit    noggit    0.8    Apache License, Version 2.0
    org.slf4j    jcl-over-slf4j    1.7.24    MIT License
    org.slf4j    slf4j-api    1.7.24    MIT License
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Set up client

Sources, Flows and Sinks provided by this connector need a prepared org.apache.solr.client.solrj.SolrClient to access to Solr.

Scala

val zkHost = "127.0.0.1:9984/solr" implicit val client: CloudSolrClient = new CloudSolrClient.Builder().withZkHost(zkHost).build
Java
zkHost = "127.0.0.1:9984/solr";

Source Usage

Create a tuple stream.

Scala
val factory = new StreamFactory().withCollectionZkHost(collection, zkHost)
val solrClientCache = new SolrClientCache()
val streamContext = new StreamContext()
streamContext.setSolrClientCache(solrClientCache)

val expression =
  StreamExpressionParser.parse(s"""search($collection, q=*:*, fl="title,comment", sort="title asc")""")
val stream: TupleStream = new CloudSolrStream(expression, factory)
stream.setStreamContext(streamContext)
Java
StreamFactory factory = new StreamFactory().withCollectionZkHost(collection, zkHost);
SolrClientCache solrClientCache = new SolrClientCache();
StreamContext streamContext = new StreamContext();
streamContext.setSolrClientCache(solrClientCache);

String expressionStr =
    String.format("search(%s, q=*:*, fl=\"title,comment\", sort=\"title asc\")", collection);
StreamExpression expression = StreamExpressionParser.parse(expressionStr);
TupleStream stream = new CloudSolrStream(expression, factory);
stream.setStreamContext(streamContext);

Use SolrSource.create to create SolrSource. SolrSource.

Scala
val source = SolrSource
  .fromTupleStream(ts = stream)
Java
Source<Tuple, NotUsed> source = SolrSource.fromTupleStream(stream);

Sink Usage

Now we can stream messages to Solr by providing the SolrClient to the SolrSink. SolrSink.

Scala
case class Book(title: String, comment: String = "", routerOpt: Option[String] = None)

val bookToDoc: Book => SolrInputDocument = { b =>
  val doc = new SolrInputDocument
  doc.setField("title", b.title)
  doc.setField("comment", b.comment)
  b.routerOpt.foreach { router =>
    doc.setField("router", router)
  }
  doc
}

val tupleToBook: Tuple => Book = { t =>
  val title = t.getString("title")
  Book(title, t.getString("comment"))
}
Java
public static class Book {
  public String title;

  public String comment;

  public String router;

  public Book() {}

  public Book(String title) {
    this.title = title;
  }

  public Book(String title, String comment) {
    this.title = title;
    this.comment = comment;
  }

  public Book(String title, String comment, String router) {
    this.title = title;
    this.comment = comment;
    this.router = router;
  }
}

Function<Book, SolrInputDocument> bookToDoc =
    book -> {
      SolrInputDocument doc = new SolrInputDocument();
      doc.setField("title", book.title);
      doc.setField("comment", book.comment);
      if (book.router != null) doc.setField("router", book.router);
      return doc;
    };

Function<Tuple, Book> tupleToBook =
    tuple -> {
      String title = tuple.getString("title");
      return new Book(title, tuple.getString("comment"));
    };

With document sink

Use SolrSink.document to stream SolrInputDocument to Solr.

Scala
val f1 = SolrSource
  .fromTupleStream(ts = stream)
  .map { tuple: Tuple =>
    val book: Book = tupleToBook(tuple)
    val doc: SolrInputDocument = bookToDoc(book)
    IncomingUpsertMessage(doc)
  }
  .groupedWithin(5, new FiniteDuration(10, TimeUnit.MILLISECONDS))
  .runWith(
    SolrSink.documents(
      collection = "collection2",
      settings = SolrUpdateSettings(commitWithin = 5)
    )
  )
Java
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 =
    SolrSource.fromTupleStream(stream)
        .map(
            tuple -> {
              Book book = tupleToBook.apply(tuple);
              SolrInputDocument doc = bookToDoc.apply(book);
              return IncomingUpsertMessage.create(doc);
            })
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.documents("collection2", settings, cluster.getSolrClient()), materializer);
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 =
    SolrSource.fromTupleStream(stream)
        .map(
            tuple -> {
              Book book = tupleToBook.apply(tuple);
              SolrInputDocument doc = bookToDoc.apply(book);
              return IncomingUpsertMessage.create(doc);
            })
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.documents("collection7", settings, cluster.getSolrClient()), materializer);
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 =
    SolrSource.fromTupleStream(stream)
        .map(
            tuple -> {
              Book book = new Book(tupleToBook.apply(tuple).title, "Written by good authors.");
              SolrInputDocument doc = bookToDoc.apply(book);
              return IncomingUpsertMessage.create(doc);
            })
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.documents("collection8", settings, cluster.getSolrClient()), materializer);
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 =
    SolrSource.fromTupleStream(stream)
        .map(
            tuple -> {
              Book book = tupleToBook.apply(tuple);
              SolrInputDocument doc = bookToDoc.apply(book);
              return IncomingUpsertMessage.create(doc);
            })
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.documents("collection9", settings, cluster.getSolrClient()), materializer);

With bean sink

Firstly, create a POJO.

Scala
import org.apache.solr.client.solrj.beans.Field
import scala.annotation.meta.field
case class BookBean(@(Field @field) title: String)
Java
class BookBean {
  @Field("title")
  public String title;

  public BookBean(String title) {
    this.title = title;
  }
}

Use SolrSink.bean to stream POJOs to Solr.

Scala
val res1 = SolrSource
  .fromTupleStream(ts = stream)
  .map { tuple: Tuple =>
    val title = tuple.getString("title")
    IncomingUpsertMessage(BookBean(title))
  }
  .groupedWithin(5, new FiniteDuration(10, TimeUnit.MILLISECONDS))
  .runWith(
    SolrSink.beans[BookBean](
      collection = "collection3",
      settings = SolrUpdateSettings(commitWithin = 5)
    )
  )
Java
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 =
    SolrSource.fromTupleStream(stream)
        .map(
            tuple -> {
              String title = tuple.getString("title");
              return IncomingUpsertMessage.create(new BookBean(title));
            })
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.beans("collection3", settings, cluster.getSolrClient(), BookBean.class),
            materializer);

With typed sink

Use SolrSink.typed to stream messages with custom binding to Solr.

Scala
val res1 = SolrSource
  .fromTupleStream(ts = stream)
  .map { tuple: Tuple =>
    val book: Book = tupleToBook(tuple)
    IncomingUpsertMessage(book)
  }
  .groupedWithin(5, new FiniteDuration(10, TimeUnit.MILLISECONDS))
  .runWith(
    SolrSink
      .typeds[Book](
        collection = "collection4",
        settings = SolrUpdateSettings(commitWithin = 5),
        binder = bookToDoc
      )
  )
Java
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 =
    SolrSource.fromTupleStream(stream)
        .map(tuple -> IncomingUpsertMessage.create(tupleToBook.apply(tuple)))
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.typeds(
                "collection4", settings, bookToDoc, cluster.getSolrClient(), Book.class),
            materializer);

Configuration

We can configure the sink by SolrUpdateSettings.

Scala
import akka.stream.alpakka.solr.SolrUpdateSettings

val settings =
  SolrUpdateSettings(commitWithin = -1)
Java
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(-1);
Parameter Default Description
commitWithin -1 Max time (in ms) before a commit will happen, -1 for manual committing

Update atomically documents

We can update atomically documents.

Scala
val f2 = SolrSource
  .fromTupleStream(ts = stream2)
  .map { tuple: Tuple =>
    IncomingAtomicUpdateMessage[SolrInputDocument](
      "title",
      tuple.fields.get("title").toString,
      None,
      Map("comment" -> Map("set" -> (tuple.fields.get("comment") + " It is a good book!!!")))
    )
  }
  .groupedWithin(5, new FiniteDuration(10, TimeUnit.MILLISECONDS))
  .runWith(
    SolrSink.documents(
      collection = "collection8",
      settings = SolrUpdateSettings()
    )
  )
Java
CompletionStage<Done> res2 =
    SolrSource.fromTupleStream(stream2)
        .map(
            t -> {
              Map<String, Map<String, Object>> m1 = new HashMap<>();
              Map<String, Object> m2 = new HashMap<>();
              m2.put("set", (t.fields.get("comment") + " It's is a good book!!!"));
              m1.put("comment", m2);
              return IncomingAtomicUpdateMessage.<SolrInputDocument>create(
                  "title", t.fields.get("title").toString(), m1);
            })
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.documents("collection8", settings, cluster.getSolrClient()), materializer);

We can use typed and bean to update atomically.

If a collection contains a router field, we have to use the IncomingAtomicUpdateMessage with the router field parameter.

Delete documents by ids

We can delete documents by ids.

Scala
val f2 = SolrSource
  .fromTupleStream(ts = stream2)
  .map { tuple: Tuple =>
    IncomingDeleteMessageByIds[SolrInputDocument](tuple.fields.get("title").toString)
  }
  .groupedWithin(5, new FiniteDuration(10, TimeUnit.MILLISECONDS))
  .runWith(
    SolrSink.documents(
      collection = "collection7",
      settings = SolrUpdateSettings()
    )
  )
Java
CompletionStage<Done> res2 =
    SolrSource.fromTupleStream(stream2)
        .map(
            t ->
                IncomingDeleteMessageByIds.<SolrInputDocument>create(
                    tupleToBook.apply(t).title))
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.documents("collection7", settings, cluster.getSolrClient()), materializer);

We can use typed and bean to delete.

Delete documents by query

We can delete documents by query.

Scala
val f2 = SolrSource
  .fromTupleStream(ts = stream2)
  .map { tuple: Tuple =>
    IncomingDeleteMessageByQuery[SolrInputDocument]("title:\"" + tuple.fields.get("title").toString + "\"")
  }
  .groupedWithin(5, new FiniteDuration(10, TimeUnit.MILLISECONDS))
  .runWith(
    SolrSink.documents(
      collection = "collection11",
      settings = SolrUpdateSettings()
    )
  )
Java
CompletionStage<Done> res2 =
    SolrSource.fromTupleStream(stream2)
        .map(
            t ->
                IncomingDeleteMessageByQuery.<SolrInputDocument>create(
                    "title:\"" + t.fields.get("title").toString() + "\""))
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.documents("collection9", settings, cluster.getSolrClient()), materializer);

We can use typed and bean to delete.

Flow Usage

You can also build flow stages with SolrFlow. SolrFlow. The API is similar to creating Sinks.

Scala
val res1 = SolrSource
  .fromTupleStream(ts = stream)
  .map { tuple: Tuple =>
    val book: Book = tupleToBook(tuple)
    IncomingUpsertMessage(book)
  }
  .groupedWithin(5, new FiniteDuration(10, TimeUnit.MILLISECONDS))
  .via(
    SolrFlow
      .typeds[Book](
        collection = "collection5",
        settings = SolrUpdateSettings(commitWithin = 5),
        binder = bookToDoc
      )
  )
  .runWith(Sink.seq)
Java
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);
CompletionStage<Done> f1 =
    SolrSource.fromTupleStream(stream)
        .map(tuple -> IncomingUpsertMessage.create(tupleToBook.apply(tuple)))
        .groupedWithin(5, Duration.ofMillis(10))
        .via(
            SolrFlow.typeds(
                "collection5", settings, bookToDoc, cluster.getSolrClient(), Book.class))
        .runWith(Sink.ignore(), materializer);

Passing data through SolrFlow

Use SolrFlow.documentWithPassThrough, SolrFlow.beanWithPassThrough or SolrFlow.typedWithPassThrough.

When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to Solr.

Scala
// We're going to pretend we got messages from kafka.
// After we've written them to Solr, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)

val messagesFromKafka = List(
  KafkaMessage(Book("Book 1"), KafkaOffset(0)),
  KafkaMessage(Book("Book 2"), KafkaOffset(1)),
  KafkaMessage(Book("Book 3"), KafkaOffset(2))
)

var committedOffsets = List[KafkaOffset]()

def commitToKafka(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val res1 = Source(messagesFromKafka)
  .map { kafkaMessage: KafkaMessage =>
    val book = kafkaMessage.book
    println("title: " + book.title)

    // Transform message so that we can write to solr
    IncomingUpsertMessage(book, kafkaMessage.offset)
  }
  .groupedWithin(5, new FiniteDuration(10, TimeUnit.MILLISECONDS))
  .via( // write to Solr
    SolrFlow.typedsWithPassThrough[Book, KafkaOffset](
      collection = "collection6",
      settings = SolrUpdateSettings(commitWithin = 5),
      binder = bookToDoc
    )
  )
  .map { messageResults =>
    messageResults.foreach { result =>
      if (result.status != 0)
        throw new Exception("Failed to write message to Solr")
      // Commit to kafka
      commitToKafka(result.passThrough)
    }
  }
  .runWith(Sink.ignore)
Java
// We're going to pretend we got messages from kafka.
// After we've written them to Solr, we want
// to commit the offset to Kafka

List<KafkaMessage> messagesFromKafka =
    Arrays.asList(
        new KafkaMessage(new Book("Book 1"), new KafkaOffset(0)),
        new KafkaMessage(new Book("Book 2"), new KafkaOffset(1)),
        new KafkaMessage(new Book("Book 3"), new KafkaOffset(2)));

final KafkaCommitter kafkaCommitter = new KafkaCommitter();

SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(5);

Source.from(messagesFromKafka) // Assume we get this from Kafka
    .map(
        kafkaMessage -> {
          Book book = kafkaMessage.book;
          // Transform message so that we can write to elastic
          return IncomingUpsertMessage.create(book, kafkaMessage.offset);
        })
    .groupedWithin(5, Duration.ofMillis(10))
    .via(
        SolrFlow.typedsWithPassThrough(
            "collection6", settings, bookToDoc, cluster.getSolrClient(), Book.class))
    .map(
        messageResults -> {
          messageResults
              .stream()
              .forEach(
                  result -> {
                    if (result.status() != 0) {
                      throw new RuntimeException("Failed to write message to elastic");
                    }
                    // Commit to kafka
                    kafkaCommitter.commit(result.passThrough());
                  });
          return NotUsed.getInstance();
        })
    .runWith(Sink.seq(), materializer) // Run it
    .toCompletableFuture()
    .get(); // Wait for it to complete

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> solr/testOnly *.SolrSpec
Java
sbt
> solr/testOnly *.SolrTest
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.