Apache Solr
The Apache Solr connector has not been updated for too long and is now considered End of Life. It will be removed with the next release of Alpakka.
Solr (pronounced “solar”) is an open source enterprise search platform, written in Java, from the Apache Lucene project. Its major features include full-text search, hit highlighting, faceted search, real-time indexing, dynamic clustering, database integration, NoSQL features and rich document (e.g., Word, PDF) handling. Providing distributed search and index replication, Solr is designed for scalability and fault tolerance. Solr is widely used for enterprise search and analytics use cases and has an active development community and regular releases.
Alpakka Solr provides Akka Stream sources and sinks for Apache Solr.
For more information about Solr please visit the Solr documentation.
[+] Show project infoProject Info: Alpakka Solr | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-solr
9.0.1
|
JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 |
Scala versions | 2.13.12, 3.3.4 |
JPMS module name | akka.stream.alpakka.solr |
License | |
Readiness level | End-of-Life, it is not recommended to use this project any more.
Since 9.0.0-M1, 2024-10-14
|
Home page | https://doc.akka.io/libraries/alpakka/current |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.10.0" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-solr" % "9.0.1", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
- Gradle
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-stream_2.13 2.10.0 org.apache.solr solr-solrj 8.11.3 org.scala-lang scala-library 2.13.12 - Dependency tree
Set up a Solr client
Sources, Flows and Sinks provided by this connector need a prepared SolrClient
(eg. CloudSolrClient
) to access to Solr.
- Scala
-
source
final val zookeeperPort = 9984 final val zookeeperHost = s"127.0.0.1:$zookeeperPort/solr" implicit val solrClient: CloudSolrClient = new CloudSolrClient.Builder(Arrays.asList(zookeeperHost), Optional.empty()).build
- Java
Reading from Solr
Create a Solr TupleStream
(eg. via CloudSolrStream
) and use SolrSource.fromTupleStream
(API
) to create a source.
- Scala
-
source
val factory = new StreamFactory().withCollectionZkHost(collection, zookeeperHost) val solrClientCache = new SolrClientCache() val streamContext = new StreamContext() streamContext.setSolrClientCache(solrClientCache) val expression = StreamExpressionParser.parse(s"""search($collection, q=*:*, fl="title,comment", sort="title asc")""") val stream: TupleStream = new CloudSolrStream(expression, factory) stream.setStreamContext(streamContext) val source = SolrSource .fromTupleStream(stream)
- Java
Writing to Solr
Alpakka Solr batches updates to Solr by sending all updates of the same operation type at once to Solr. These batches are extracted from the elements within one collection sent to a Solr flow or sink. Updates of different types may be contained in a single collection sent, though. In case streams don’t have natural batches of updates, you may use the groupedWithin
operator to create count or time-based batches.
Alpakka Solr offers three styles for writing to Apache Solr:
- Using
SolrInputDocument
(viaSolrSink.documents
,SolrFlow.documents
andSolrFlow.documentsWithPassThrough
) - Annotated Java Bean classes supported by Solr’s
DocumentObjectBinder
(viaSolrSink.beans
,SolrFlow.beans
andSolrFlow.beansWithPassThrough
) - Typed streams with document binders to translate to
SolrInputDocument
(viaSolrSink.typeds
,SolrFlow.typeds
andSolrFlow.typedsWithPassThrough
)
In all variations the data is wrapped into WriteMessage
s.
Committing and configuration for updates
Data sent to Solr is not searchable until it has been committed to the index. These are the major options for handling commits:
- The Solr installation can be configured to use auto-commit.
- Specify commit-within in
SolrUpdateSettings
to trigger commits after every write through Alpakka Solr. - Use explicit committing via the
SolrClient.commit
methods on stream completion as most examples show. Ascommit
is a blocking operation, choose an appropriate execution context (preferably notsystem.dispatcher
).
Configuration of Solr committing is described in UpdateHandlers in SolrConfig.
Available settings
Parameter | Default | Description |
---|---|---|
commitWithin | -1 | Max time (in ms) before a commit will happen, -1 for explicit committing |
- Scala
-
source
import akka.stream.alpakka.solr.SolrUpdateSettings val settings = SolrUpdateSettings() .withCommitWithin(-1)
- Java
Writing SolrInputDocument
s
Use SolrSink.document
to stream SolrInputDocument
to Solr.
Defining mappings
- Scala
-
source
case class Book(title: String, comment: String = "", routerOpt: Option[String] = None) val bookToDoc: Book => SolrInputDocument = { b => val doc = new SolrInputDocument doc.setField("title", b.title) doc.setField("comment", b.comment) b.routerOpt.foreach { router => doc.setField("router", router) } doc } val tupleToBook: Tuple => Book = { t => val title = t.getString("title") Book(title, t.getString("comment")) }
- Java
Use SolrSink.documents
, SolrFlow.documents
or SolrFlow.documentsWithPassThrough
to stream SolrInputDocument
s to Solr.
A SolrClient
must be provided to SolrSink
implicitly.
- Scala
-
source
val copyCollection = SolrSource .fromTupleStream(stream) .map { (tuple: Tuple) => val book: Book = tupleToBook(tuple) val doc: SolrInputDocument = bookToDoc(book) WriteMessage.createUpsertMessage(doc) } .groupedWithin(5, 10.millis) .runWith( SolrSink.documents(collectionName, SolrUpdateSettings()) ) // explicit commit when stream ended .map { _ => solrClient.commit(collectionName) }(commitExecutionContext)
- Java
Writing Java beans
Firstly, create a POJO.
- Scala
-
source
import org.apache.solr.client.solrj.beans.Field import scala.annotation.meta.field case class BookBean(@(Field @field) title: String)
- Java
Use SolrSink.beans
, SolrFlow.beans
or SolrFlow.beansWithPassThrough
to stream POJOs to Solr.
- Scala
-
source
val copyCollection = SolrSource .fromTupleStream(stream) .map { (tuple: Tuple) => val title = tuple.getString("title") WriteMessage.createUpsertMessage(BookBean(title)) } .groupedWithin(5, 10.millis) .runWith( SolrSink.beans[BookBean](collectionName, SolrUpdateSettings()) ) // explicit commit when stream ended .map { _ => solrClient.commit(collectionName) }(commitExecutionContext)
- Java
Writing arbitrary classes via custom binding
Use SolrSink.typeds
, SolrFlow.typeds
or SolrFlow.typedsWithPassThrough
to stream messages with custom binding to Solr.
- Scala
-
source
val copyCollection = SolrSource .fromTupleStream(stream) .map { (tuple: Tuple) => val book: Book = tupleToBook(tuple) WriteMessage.createUpsertMessage(book) } .groupedWithin(5, 10.millis) .runWith( SolrSink .typeds[Book]( collectionName, SolrUpdateSettings(), binder = bookToDoc ) ) // explicit commit when stream ended .map { _ => solrClient.commit(collectionName) }(commitExecutionContext)
- Java
Using a flow with custom binding
You can also build flow stages with SolrFlow
.
- Scala
-
source
val copyCollection = SolrSource .fromTupleStream(stream) .map { (tuple: Tuple) => val book: Book = tupleToBook(tuple) WriteMessage.createUpsertMessage(book) } .groupedWithin(5, 10.millis) .via( SolrFlow .typeds[Book]( collectionName, SolrUpdateSettings(), binder = bookToDoc ) ) .runWith(Sink.seq) // explicit commit when stream ended .map { seq => solrClient.commit(collectionName) seq }(commitExecutionContext)
- Java
Passing data through SolrFlow
All flow types (documents
, beans
, typeds
) exist with pass-through support: Use SolrFlow.documentsWithPassThrough
, SolrFlow.beansWithPassThrough
or SolrFlow.typedsWithPassThrough
.
When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to Solr. This scenario uses implicit committing via the commit within setting.
- Scala
-
source
// Note: This code mimics Alpakka Kafka APIs val copyCollection = kafkaConsumerSource .map { (kafkaMessage: CommittableMessage) => val book = kafkaMessage.book // Transform message so that we can write to solr WriteMessage.createUpsertMessage(book).withPassThrough(kafkaMessage.committableOffset) } .groupedWithin(5, 10.millis) .via( // write to Solr SolrFlow.typedsWithPassThrough[Book, CommittableOffset]( collectionName, // use implicit commits to Solr SolrUpdateSettings().withCommitWithin(5), binder = bookToDoc ) ) // check status and collect Kafka offsets .map { messageResults => val offsets = messageResults.map { result => if (result.status != 0) throw new Exception("Failed to write message to Solr") result.passThrough } CommittableOffsetBatch(offsets) } .mapAsync(1)(_.commitScaladsl()) .runWith(Sink.ignore)
- Java
Excluding messages
Failure to deserialize a kafka message is a particular case of conditional message processing. It is also likely that we would have no message to produce to SolR when we encounter messages that fail to deserialize. The solr flow will not let us pass through the corresponding committable offset without doing a request to solr.
Use WriteMessage.createPassThrough
to exclude this message without doing any change on solr inside a flow.
- Scala
-
source
// Note: This code mimics Alpakka Kafka APIs val copyCollection = kafkaConsumerSource .map { (offset: CommittableOffset) => // Transform message so that we can write to solr WriteMessage.createPassThrough(offset).withSource(new SolrInputDocument()) } .groupedWithin(5, 10.millis) .via( // write to Solr SolrFlow.documentsWithPassThrough[CommittableOffset]( collectionName, // use implicit commits to Solr SolrUpdateSettings().withCommitWithin(5) ) ) // check status and collect Kafka offsets .map { messageResults => val offsets = messageResults.map { result => if (result.status != 0) throw new Exception("Failed to write message to Solr") result.passThrough } CommittableOffsetBatch(offsets) } .mapAsync(1)(_.commitScaladsl()) .runWith(Sink.ignore)
- Java
Update documents
With WriteMessage.createUpdateMessage
documents can be updated atomically. All flow and sink types (documents
, beans
, typeds
) support atomic updates.
- Scala
-
source
val updateCollection = SolrSource .fromTupleStream(stream2) .map { (tuple: Tuple) => val id = tuple.getFields.get("title").toString val comment = tuple.getFields.get("comment").toString WriteMessage.createUpdateMessage[SolrInputDocument]( idField = "title", idValue = id, updates = Map( "comment" -> Map("set" -> (comment + " It is a good book!!!")) ) ) } .groupedWithin(5, 10.millis) .runWith( SolrSink.documents(collectionName, SolrUpdateSettings()) ) // explicit commit when stream ended .map { _ => solrClient.commit(collectionName) }(commitExecutionContext)
- Java
If a collection contains a router field, use the WriteMessage.createUpdateMessage(...).withRoutingFieldValue(..)
to set the router field.
Delete documents by ids
With WriteMessage.createDeleteMessage(id)
documents may be deleted by ID. All flow and sink types (documents
, beans
, typeds
) support deleting.
- Scala
-
source
val deleteDocuments = SolrSource .fromTupleStream(stream2) .map { (tuple: Tuple) => val id = tuple.getFields.get("title").toString WriteMessage.createDeleteMessage[SolrInputDocument](id) } .groupedWithin(5, 10.millis) .runWith( SolrSink.documents(collectionName, SolrUpdateSettings()) ) // explicit commit when stream ended .map { _ => solrClient.commit(collectionName) }(commitExecutionContext)
- Java
Delete documents by query
With WriteMessage.createDeleteByQueryMessage(query)
documents matching a query may be deleted. All flow and sink types (documents
, beans
, typeds
) support deleting.
- Scala
-
source
val deleteByQuery = SolrSource .fromTupleStream(stream2) .map { (tuple: Tuple) => val title = tuple.getFields.get("title").toString WriteMessage.createDeleteByQueryMessage[SolrInputDocument]( s"""title:"$title" """ ) } .groupedWithin(5, 10.millis) .runWith( SolrSink.documents(collectionName, SolrUpdateSettings()) ) // explicit commit when stream ended .map { _ => solrClient.commit(collectionName) }(commitExecutionContext)
- Java