mapWithResource
Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.
Signature
Flow.mapWithResource
1. create
: Open or Create the resource. 2. f
: Transform each element inputs with the help of resource. 3. close
: Close the resource, invoked on end of stream or if the stream fails, optionally outputting a last element.
Description
Transform each stream element with the help of a resource. The functions are by default called on Akka’s dispatcher for blocking IO to avoid interfering with other stream operations. See Blocking Needs Careful Management for an explanation on why this is important. The resource creation function is invoked once when the stream is materialized and the returned resource is passed to the mapping function for mapping the first element. The mapping function returns a mapped element to emit downstream. The returned T MUST NOT be null as it is illegal as stream element - according to the Reactive Streams specification.
The close function is called when upstream or downstream completes normally or exceptionally, and will be called only once.
- upstream completes or fails, the optional value returns by close
will be emitted to downstream if defined. - downstream cancels or fails, the optional value returns by close
will be ignored. - shutdowns abruptly, the optional value returns by close
will be ignored.
You can do some clean-up here.
Early completion can be done with combination of the Flow.takeWhile
operator.
See also unfoldResource, unfoldResourceAsync.
You can configure the default dispatcher for this Source by changing the akka.stream.materializer.blocking-io-dispatcher
or set it for a given Source by using ActorAttributes.
Examples
Imagine we have a database API which may potentially block when we perform a query, and the database connection can be reused for each query.
- Scala
-
source
trait DBDriver { def create(url: URL, userName: String, password: String): Connection } trait Connection { def close(): Unit } trait Database { //blocking query def doQuery(connection: Connection, query: String): QueryResult = ??? } trait QueryResult { def hasMore: Boolean // potentially blocking retrieval of each element def next(): DataBaseRecord // potentially blocking retrieval all element def toList(): List[DataBaseRecord] } trait DataBaseRecord
- Java
Let’s see how we make use of the API above safely through mapWithResource
:
- Scala
-
source
//some database for JVM val db: Database = ??? Source( List( "SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;", "SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;")) .mapWithResource(() => dbDriver.create(url, userName, password))( (connection, query) => db.doQuery(connection, query).toList(), conn => { conn.close() None }) .mapConcat(identity) .runForeach(println)
- Java
In this example we retrieve data form two tables with the same shared connection, and transform the results to individual records with mapConcat(identity)
, once done the connection is closed.
Reactive Streams semantics
emits the mapping function returns an element and downstream is ready to consume it
backpressures downstream backpressures
completes upstream completes
cancels downstream cancels