Apache Solr

End of life

The Apache Solr connector has not been updated for too long and is now considered End of Life. It will be removed with the next release of Alpakka.

Apache Solr

Solr (pronounced “solar”) is an open source enterprise search platform, written in Java, from the Apache Lucene project. Its major features include full-text search, hit highlighting, faceted search, real-time indexing, dynamic clustering, database integration, NoSQL features and rich document (e.g., Word, PDF) handling. Providing distributed search and index replication, Solr is designed for scalability and fault tolerance. Solr is widely used for enterprise search and analytics use cases and has an active development community and regular releases.

Wikipedia

Alpakka Solr provides Akka Stream sources and sinks for Apache Solr.

For more information about Solr please visit the Solr documentation.

Project Info: Alpakka Solr
Artifact
com.lightbend.akka
akka-stream-alpakka-solr
9.0.0
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12, 3.3.3
JPMS module nameakka.stream.alpakka.solr
License
Readiness level
End-of-Life, it is not recommended to use this project any more.
Since 9.0.0-M1, 2024-10-14
Home pagehttps://doc.akka.io/libraries/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.10.0"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-solr" % "9.0.0",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.10.0</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-solr_${scala.binary.version}</artifactId>
    <version>9.0.0</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.10.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-solr_${versions.ScalaBinary}:9.0.0"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.typesafe.akkaakka-stream_2.132.10.0
org.apache.solrsolr-solrj8.11.3
org.scala-langscala-library2.13.12
Dependency tree
com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
        com.typesafe    config    1.4.3    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
org.apache.solr    solr-solrj    8.11.3
    com.fasterxml.woodstox    woodstox-core    6.5.1    The Apache License, Version 2.0
    commons-io    commons-io    2.11.0
    commons-lang    commons-lang    2.6
    io.netty    netty-buffer    4.1.99.Final
    io.netty    netty-codec    4.1.99.Final
    io.netty    netty-common    4.1.99.Final
    io.netty    netty-handler    4.1.99.Final
    io.netty    netty-resolver    4.1.99.Final
    io.netty    netty-transport-native-epoll    4.1.99.Final
    io.netty    netty-transport-native-unix-common    4.1.99.Final
    io.netty    netty-transport    4.1.99.Final
    org.apache.commons    commons-math3    3.6.1
    org.apache.httpcomponents    httpclient    4.5.13
    org.apache.httpcomponents    httpcore    4.4.14
    org.apache.httpcomponents    httpmime    4.5.13
    org.apache.zookeeper    zookeeper-jute    3.6.2
    org.apache.zookeeper    zookeeper    3.6.2
    org.codehaus.woodstox    stax2-api    4.2.1    The BSD License
    org.eclipse.jetty.http2    http2-client    9.4.53.v20231009
    org.eclipse.jetty.http2    http2-common    9.4.53.v20231009
    org.eclipse.jetty.http2    http2-hpack    9.4.53.v20231009
    org.eclipse.jetty.http2    http2-http-client-transport    9.4.53.v20231009
        org.eclipse.jetty    jetty-alpn-java-client    9.4.53.v20231009
            org.eclipse.jetty    jetty-alpn-client    9.4.53.v20231009
                org.eclipse.jetty    jetty-io    9.4.53.v20231009
                    org.eclipse.jetty    jetty-util    9.4.53.v20231009
    org.eclipse.jetty    jetty-alpn-client    9.4.53.v20231009
        org.eclipse.jetty    jetty-io    9.4.53.v20231009
            org.eclipse.jetty    jetty-util    9.4.53.v20231009
    org.eclipse.jetty    jetty-alpn-java-client    9.4.53.v20231009
        org.eclipse.jetty    jetty-alpn-client    9.4.53.v20231009
            org.eclipse.jetty    jetty-io    9.4.53.v20231009
                org.eclipse.jetty    jetty-util    9.4.53.v20231009
    org.eclipse.jetty    jetty-client    9.4.53.v20231009
    org.eclipse.jetty    jetty-http    9.4.53.v20231009
    org.eclipse.jetty    jetty-io    9.4.53.v20231009
        org.eclipse.jetty    jetty-util    9.4.53.v20231009
    org.eclipse.jetty    jetty-util    9.4.53.v20231009
    org.slf4j    jcl-over-slf4j    1.7.36    Apache License, Version 2.0
    org.slf4j    slf4j-api    1.7.36
    org.xerial.snappy    snappy-java    1.1.10.1    Apache-2.0
org.scala-lang    scala-library    2.13.12    Apache-2.0

Set up a Solr client

Sources, Flows and Sinks provided by this connector need a prepared SolrClient (eg. CloudSolrClient) to access to Solr.

Scala
sourcefinal val zookeeperPort = 9984
final val zookeeperHost = s"127.0.0.1:$zookeeperPort/solr"
implicit val solrClient: CloudSolrClient =
  new CloudSolrClient.Builder(Arrays.asList(zookeeperHost), Optional.empty()).build
Java
sourceprivate static final int zookeeperPort = 9984;
private static final String zookeeperHost = "127.0.0.1:" + zookeeperPort + "/solr";

  CloudSolrClient solrClient =
      new CloudSolrClient.Builder(Arrays.asList(zookeeperHost), Optional.empty()).build();

Reading from Solr

Create a Solr TupleStream (eg. via CloudSolrStream) and use SolrSource.fromTupleStream (APIAPI) to create a source.

Scala
sourceval factory = new StreamFactory().withCollectionZkHost(collection, zookeeperHost)
val solrClientCache = new SolrClientCache()
val streamContext = new StreamContext()
streamContext.setSolrClientCache(solrClientCache)

val expression =
  StreamExpressionParser.parse(s"""search($collection, q=*:*, fl="title,comment", sort="title asc")""")
val stream: TupleStream = new CloudSolrStream(expression, factory)
stream.setStreamContext(streamContext)

val source = SolrSource
  .fromTupleStream(stream)
Java
sourceStreamFactory factory = new StreamFactory().withCollectionZkHost(collection, zookeeperHost);
SolrClientCache solrClientCache = new SolrClientCache();
StreamContext streamContext = new StreamContext();
streamContext.setSolrClientCache(solrClientCache);

String expressionStr =
    String.format("search(%s, q=*:*, fl=\"title,comment\", sort=\"title asc\")", collection);
StreamExpression expression = StreamExpressionParser.parse(expressionStr);
TupleStream stream = new CloudSolrStream(expression, factory);
stream.setStreamContext(streamContext);

Source<Tuple, NotUsed> source = SolrSource.fromTupleStream(stream);

Writing to Solr

Alpakka Solr batches updates to Solr by sending all updates of the same operation type at once to Solr. These batches are extracted from the elements within one collection sent to a Solr flow or sink. Updates of different types may be contained in a single collection sent, though. In case streams don’t have natural batches of updates, you may use the groupedWithin operator to create count or time-based batches.

Alpakka Solr offers three styles for writing to Apache Solr:

  1. Using SolrInputDocument (via SolrSink.documents, SolrFlow.documents and SolrFlow.documentsWithPassThrough)
  2. Annotated Java Bean classes supported by Solr’s DocumentObjectBinder (via SolrSink.beans, SolrFlow.beans and SolrFlow.beansWithPassThrough)
  3. Typed streams with document binders to translate to SolrInputDocument (via SolrSink.typeds, SolrFlow.typeds and SolrFlow.typedsWithPassThrough)

In all variations the data is wrapped into WriteMessages.

Committing and configuration for updates

Data sent to Solr is not searchable until it has been committed to the index. These are the major options for handling commits:

  1. The Solr installation can be configured to use auto-commit.
  2. Specify commit-within in SolrUpdateSettings to trigger commits after every write through Alpakka Solr.
  3. Use explicit committing via the SolrClient.commit methods on stream completion as most examples show. As commit is a blocking operation, choose an appropriate execution context (preferably not system.dispatcher).

Configuration of Solr committing is described in UpdateHandlers in SolrConfig.

Available settings

Parameter Default Description
commitWithin -1 Max time (in ms) before a commit will happen, -1 for explicit committing
Scala
sourceimport akka.stream.alpakka.solr.SolrUpdateSettings

val settings = SolrUpdateSettings()
  .withCommitWithin(-1)
Java
sourceimport akka.stream.alpakka.solr.SolrUpdateSettings;
SolrUpdateSettings settings = SolrUpdateSettings.create().withCommitWithin(-1);

Writing SolrInputDocuments

Use SolrSink.document to stream SolrInputDocument to Solr.

Defining mappings

Scala
sourcecase class Book(title: String, comment: String = "", routerOpt: Option[String] = None)

val bookToDoc: Book => SolrInputDocument = { b =>
  val doc = new SolrInputDocument
  doc.setField("title", b.title)
  doc.setField("comment", b.comment)
  b.routerOpt.foreach { router =>
    doc.setField("router", router)
  }
  doc
}

val tupleToBook: Tuple => Book = { t =>
  val title = t.getString("title")
  Book(title, t.getString("comment"))
}
Java
sourcepublic static class Book {
  public String title;

  public String comment;

  public String router;

  public Book() {}

  public Book(String title) {
    this.title = title;
  }

  public Book(String title, String comment) {
    this.title = title;
    this.comment = comment;
  }

  public Book(String title, String comment, String router) {
    this.title = title;
    this.comment = comment;
    this.router = router;
  }
}

Function<Book, SolrInputDocument> bookToDoc =
    book -> {
      SolrInputDocument doc = new SolrInputDocument();
      doc.setField("title", book.title);
      doc.setField("comment", book.comment);
      if (book.router != null) doc.setField("router", book.router);
      return doc;
    };

Function<Tuple, Book> tupleToBook =
    tuple -> {
      String title = tuple.getString("title");
      return new Book(title, tuple.getString("comment"));
    };

Use SolrSink.documents, SolrFlow.documents or SolrFlow.documentsWithPassThrough to stream SolrInputDocuments to Solr.

A SolrClient must be provided to SolrSink implicitly. SolrSink.

Scala
sourceval copyCollection = SolrSource
  .fromTupleStream(stream)
  .map { (tuple: Tuple) =>
    val book: Book = tupleToBook(tuple)
    val doc: SolrInputDocument = bookToDoc(book)
    WriteMessage.createUpsertMessage(doc)
  }
  .groupedWithin(5, 10.millis)
  .runWith(
    SolrSink.documents(collectionName, SolrUpdateSettings())
  )
  // explicit commit when stream ended
  .map { _ =>
    solrClient.commit(collectionName)
  }(commitExecutionContext)
Java
sourceCompletionStage<UpdateResponse> copyCollection =
    SolrSource.fromTupleStream(stream)
        .map(
            tuple -> {
              Book book = tupleToBook.apply(tuple);
              SolrInputDocument doc = bookToDoc.apply(book);
              return WriteMessage.createUpsertMessage(doc);
            })
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.documents(collectionName, SolrUpdateSettings.create(), solrClient), system)
        // explicit commit when stream ended
        .thenApply(
            done -> {
              try {
                return solrClient.commit(collectionName);
              } catch (Exception e) {
                throw new IllegalStateException(e);
              }
            });

Writing Java beans

Firstly, create a POJO.

Scala
sourceimport org.apache.solr.client.solrj.beans.Field

import scala.annotation.meta.field
case class BookBean(@(Field @field) title: String)
Java
sourceclass BookBean {
  @Field("title")
  public String title;

  public BookBean(String title) {
    this.title = title;
  }
}

Use SolrSink.beans, SolrFlow.beans or SolrFlow.beansWithPassThrough to stream POJOs to Solr.

Scala
sourceval copyCollection = SolrSource
  .fromTupleStream(stream)
  .map { (tuple: Tuple) =>
    val title = tuple.getString("title")
    WriteMessage.createUpsertMessage(BookBean(title))
  }
  .groupedWithin(5, 10.millis)
  .runWith(
    SolrSink.beans[BookBean](collectionName, SolrUpdateSettings())
  )
  // explicit commit when stream ended
  .map { _ =>
    solrClient.commit(collectionName)
  }(commitExecutionContext)
Java
sourceCompletionStage<UpdateResponse> copyCollection =
    SolrSource.fromTupleStream(stream)
        .map(
            tuple -> {
              String title = tuple.getString("title");
              return WriteMessage.createUpsertMessage(new BookBean(title));
            })
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.beans(
                collectionName, SolrUpdateSettings.create(), solrClient, BookBean.class),
            system)
        // explicit commit when stream ended
        .thenApply(
            done -> {
              try {
                return solrClient.commit(collectionName);
              } catch (Exception e) {
                throw new IllegalStateException(e);
              }
            });

Writing arbitrary classes via custom binding

Use SolrSink.typeds, SolrFlow.typeds or SolrFlow.typedsWithPassThrough to stream messages with custom binding to Solr.

Scala
sourceval copyCollection = SolrSource
  .fromTupleStream(stream)
  .map { (tuple: Tuple) =>
    val book: Book = tupleToBook(tuple)
    WriteMessage.createUpsertMessage(book)
  }
  .groupedWithin(5, 10.millis)
  .runWith(
    SolrSink
      .typeds[Book](
        collectionName,
        SolrUpdateSettings(),
        binder = bookToDoc
      )
  )
  // explicit commit when stream ended
  .map { _ =>
    solrClient.commit(collectionName)
  }(commitExecutionContext)
Java
sourceCompletionStage<UpdateResponse> copyCollection =
    SolrSource.fromTupleStream(stream)
        .map(tuple -> WriteMessage.createUpsertMessage(tupleToBook.apply(tuple)))
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.typeds(
                collectionName, SolrUpdateSettings.create(), bookToDoc, solrClient, Book.class),
            system)
        // explicit commit when stream ended
        .thenApply(
            done -> {
              try {
                return solrClient.commit(collectionName);
              } catch (Exception e) {
                throw new IllegalStateException(e);
              }
            });

Using a flow with custom binding

You can also build flow stages with SolrFlow. SolrFlow.

Scala
sourceval copyCollection = SolrSource
  .fromTupleStream(stream)
  .map { (tuple: Tuple) =>
    val book: Book = tupleToBook(tuple)
    WriteMessage.createUpsertMessage(book)
  }
  .groupedWithin(5, 10.millis)
  .via(
    SolrFlow
      .typeds[Book](
        collectionName,
        SolrUpdateSettings(),
        binder = bookToDoc
      )
  )
  .runWith(Sink.seq)
  // explicit commit when stream ended
  .map { seq =>
    solrClient.commit(collectionName)
    seq
  }(commitExecutionContext)
Java
sourceCompletionStage<UpdateResponse> copyCollection =
    SolrSource.fromTupleStream(stream)
        .map(tuple -> WriteMessage.createUpsertMessage(tupleToBook.apply(tuple)))
        .groupedWithin(5, Duration.ofMillis(10))
        .via(
            SolrFlow.typeds(
                collectionName, SolrUpdateSettings.create(), bookToDoc, solrClient, Book.class))
        .runWith(Sink.ignore(), system)
        // explicit commit when stream ended
        .thenApply(
            done -> {
              try {
                return solrClient.commit(collectionName);
              } catch (Exception e) {
                throw new IllegalStateException(e);
              }
            });

Passing data through SolrFlow

All flow types (documents, beans, typeds) exist with pass-through support: Use SolrFlow.documentsWithPassThrough, SolrFlow.beansWithPassThrough or SolrFlow.typedsWithPassThrough.

When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to Solr. This scenario uses implicit committing via the commit within setting.

Scala
source// Note: This code mimics Alpakka Kafka APIs
val copyCollection = kafkaConsumerSource
  .map { (kafkaMessage: CommittableMessage) =>
    val book = kafkaMessage.book
    // Transform message so that we can write to solr
    WriteMessage.createUpsertMessage(book).withPassThrough(kafkaMessage.committableOffset)
  }
  .groupedWithin(5, 10.millis)
  .via( // write to Solr
    SolrFlow.typedsWithPassThrough[Book, CommittableOffset](
      collectionName,
      // use implicit commits to Solr
      SolrUpdateSettings().withCommitWithin(5),
      binder = bookToDoc
    )
  ) // check status and collect Kafka offsets
  .map { messageResults =>
    val offsets = messageResults.map { result =>
      if (result.status != 0)
        throw new Exception("Failed to write message to Solr")
      result.passThrough
    }
    CommittableOffsetBatch(offsets)
  }
  .mapAsync(1)(_.commitScaladsl())
  .runWith(Sink.ignore)
Java
source// Note: This code mimics Alpakka Kafka APIs
CompletionStage<Done> completion =
    kafkaConsumerSource // Assume we get this from Kafka
        .map(
            kafkaMessage -> {
              Book book = kafkaMessage.book;
              // Transform message so that we can write to elastic
              return WriteMessage.createUpsertMessage(book)
                  .withPassThrough(kafkaMessage.committableOffset);
            })
        .groupedWithin(5, Duration.ofMillis(10))
        .via(
            SolrFlow.typedsWithPassThrough(
                collectionName,
                // use implicit commits to Solr
                SolrUpdateSettings.create().withCommitWithin(5),
                bookToDoc,
                solrClient,
                Book.class))
        .map(
            messageResults ->
                messageResults.stream()
                    .map(
                        result -> {
                          if (result.status() != 0) {
                            throw new RuntimeException("Failed to write message to Solr");
                          }
                          return result.passThrough();
                        })
                    .collect(Collectors.toList()))
        .map(ConsumerMessage::createCommittableOffsetBatch)
        .mapAsync(1, CommittableOffsetBatch::commitJavadsl)
        .runWith(Sink.ignore(), system);

Excluding messages

Failure to deserialize a kafka message is a particular case of conditional message processing. It is also likely that we would have no message to produce to SolR when we encounter messages that fail to deserialize. The solr flow will not let us pass through the corresponding committable offset without doing a request to solr.

Use WriteMessage.createPassThrough to exclude this message without doing any change on solr inside a flow.

Scala
source// Note: This code mimics Alpakka Kafka APIs
val copyCollection = kafkaConsumerSource
  .map { (offset: CommittableOffset) =>
    // Transform message so that we can write to solr
    WriteMessage.createPassThrough(offset).withSource(new SolrInputDocument())
  }
  .groupedWithin(5, 10.millis)
  .via( // write to Solr
    SolrFlow.documentsWithPassThrough[CommittableOffset](
      collectionName,
      // use implicit commits to Solr
      SolrUpdateSettings().withCommitWithin(5)
    )
  ) // check status and collect Kafka offsets
  .map { messageResults =>
    val offsets = messageResults.map { result =>
      if (result.status != 0)
        throw new Exception("Failed to write message to Solr")
      result.passThrough
    }
    CommittableOffsetBatch(offsets)
  }
  .mapAsync(1)(_.commitScaladsl())
  .runWith(Sink.ignore)
Java
source// Note: This code mimics Alpakka Kafka APIs
CompletionStage<Done> completion =
    kafkaConsumerSource // Assume we get this from Kafka
        .map(
            kafkaMessage -> {
              // Transform message so that we can write to elastic
              return WriteMessage.createPassThrough(kafkaMessage)
                  .withSource(new SolrInputDocument());
            })
        .groupedWithin(5, Duration.ofMillis(10))
        .via(
            SolrFlow.documentsWithPassThrough(
                collectionName,
                // use implicit commits to Solr
                SolrUpdateSettings.create().withCommitWithin(5),
                solrClient))
        .map(
            messageResults ->
                messageResults.stream()
                    .map(
                        result -> {
                          if (result.status() != 0) {
                            throw new RuntimeException("Failed to write message to Solr");
                          }
                          return result.passThrough();
                        })
                    .collect(Collectors.toList()))
        .map(ConsumerMessage::createCommittableOffsetBatch)
        .mapAsync(1, CommittableOffsetBatch::commitJavadsl)
        .runWith(Sink.ignore(), system);

Update documents

With WriteMessage.createUpdateMessage documents can be updated atomically. All flow and sink types (documents, beans, typeds) support atomic updates.

Scala
sourceval updateCollection = SolrSource
  .fromTupleStream(stream2)
  .map { (tuple: Tuple) =>
    val id = tuple.getFields.get("title").toString
    val comment = tuple.getFields.get("comment").toString
    WriteMessage.createUpdateMessage[SolrInputDocument](
      idField = "title",
      idValue = id,
      updates = Map(
        "comment" ->
        Map("set" -> (comment + " It is a good book!!!"))
      )
    )
  }
  .groupedWithin(5, 10.millis)
  .runWith(
    SolrSink.documents(collectionName, SolrUpdateSettings())
  )
  // explicit commit when stream ended
  .map { _ =>
    solrClient.commit(collectionName)
  }(commitExecutionContext)
Java
sourceCompletionStage<UpdateResponse> updateCollection =
    SolrSource.fromTupleStream(stream2)
        .map(
            t -> {
              String id = t.fields.get("title").toString();
              String comment = t.fields.get("comment").toString();
              Map<String, Object> m2 = new HashMap<>();
              m2.put("set", (comment + " It's is a good book!!!"));
              Map<String, Map<String, Object>> updates = new HashMap<>();
              updates.put("comment", m2);
              return WriteMessage.<SolrInputDocument>createUpdateMessage("title", id, updates);
            })
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.documents(collectionName, SolrUpdateSettings.create(), solrClient), system)
        // explicit commit when stream ended
        .thenApply(
            done -> {
              try {
                return solrClient.commit(collectionName);
              } catch (Exception e) {
                throw new IllegalStateException(e);
              }
            });

If a collection contains a router field, use the WriteMessage.createUpdateMessage(...).withRoutingFieldValue(..) to set the router field.

Delete documents by ids

With WriteMessage.createDeleteMessage(id) documents may be deleted by ID. All flow and sink types (documents, beans, typeds) support deleting.

Scala
sourceval deleteDocuments = SolrSource
  .fromTupleStream(stream2)
  .map { (tuple: Tuple) =>
    val id = tuple.getFields.get("title").toString
    WriteMessage.createDeleteMessage[SolrInputDocument](id)
  }
  .groupedWithin(5, 10.millis)
  .runWith(
    SolrSink.documents(collectionName, SolrUpdateSettings())
  )
  // explicit commit when stream ended
  .map { _ =>
    solrClient.commit(collectionName)
  }(commitExecutionContext)
Java
sourceCompletionStage<UpdateResponse> deleteDocuments =
    SolrSource.fromTupleStream(stream2)
        .map(
            t -> {
              String id = tupleToBook.apply(t).title;
              return WriteMessage.<SolrInputDocument>createDeleteMessage(id);
            })
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.documents(collectionName, SolrUpdateSettings.create(), solrClient), system)
        // explicit commit when stream ended
        .thenApply(
            done -> {
              try {
                return solrClient.commit(collectionName);
              } catch (Exception e) {
                throw new IllegalStateException(e);
              }
            });

Delete documents by query

With WriteMessage.createDeleteByQueryMessage(query) documents matching a query may be deleted. All flow and sink types (documents, beans, typeds) support deleting.

Scala
sourceval deleteByQuery = SolrSource
  .fromTupleStream(stream2)
  .map { (tuple: Tuple) =>
    val title = tuple.getFields.get("title").toString
    WriteMessage.createDeleteByQueryMessage[SolrInputDocument](
      s"""title:"$title" """
    )
  }
  .groupedWithin(5, 10.millis)
  .runWith(
    SolrSink.documents(collectionName, SolrUpdateSettings())
  )
  // explicit commit when stream ended
  .map { _ =>
    solrClient.commit(collectionName)
  }(commitExecutionContext)
Java
sourceCompletionStage<UpdateResponse> deleteByQuery =
    SolrSource.fromTupleStream(stream2)
        .map(
            t -> {
              String id = t.fields.get("title").toString();
              return WriteMessage.<SolrInputDocument>createDeleteByQueryMessage(
                  "title:\"" + id + "\"");
            })
        .groupedWithin(5, Duration.ofMillis(10))
        .runWith(
            SolrSink.documents(collectionName, SolrUpdateSettings.create(), solrClient), system)
        // explicit commit when stream ended
        .thenApply(
            done -> {
              try {
                return solrClient.commit(collectionName);
              } catch (Exception e) {
                throw new IllegalStateException(e);
              }
            });
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.