Flow.asFlowWithContext
Extracts context data from the elements of a Flow
so that it can be turned into a FlowWithContext
which can propagate that context per element along a stream.
Signature
Flow.asFlowWithContext
Flow.asFlowWithContext
Description
See Context Propagation for a general overview of context propagation.
Extracts context data from the elements of a Flow
Flow
so that it can be turned into a FlowWithContext
FlowWithContext
which can propagate that context per element along a stream. The first function passed into asFlowWithContext
must turn each incoming pair of element and context value into an element of this Flow
Flow
. The second function passed into asFlowWithContext
must turn each outgoing element of this Flow
Flow
into an outgoing context value.
See also:
- Context Propagation
Source.asSourceWithContext
Turns aSource
into aSourceWithContext
which can propagate a context per element along a stream.
Example
Elements from this flow have a correlation number, but the flow structure should focus on the text message in the elements. The first converter in asFlowWithContext
applies to the end of the “with context” flow to turn it into a regular flow again. The second converter function chooses the second value in the tuplepair as the context. Another map
operator makes the first value the stream elements in the FlowWithContext
.
- Scala
-
source
import akka.NotUsed import akka.stream.scaladsl.Flow import akka.stream.scaladsl.FlowWithContext // a regular flow with pairs as elements val flow: Flow[(String, Int), (String, Int), NotUsed] = // ??? // Declare the "flow with context" // ingoing: String and Integer // outgoing: String and Integer val flowWithContext: FlowWithContext[String, Int, String, Int, NotUsed] = // convert the flow of pairs into a "flow with context" flow .asFlowWithContext[String, Int, Int]( // at the end of this flow: put the elements and the context back into a tuple collapseContext = Tuple2.apply)( // pick the second element of the incoming pair as context extractContext = _._2) .map(_._1) // keep the first pair element as stream element val mapped = flowWithContext // regular operators apply to the element without seeing the context .map(_.reverse) // running the flow with some sample data and asserting the outcome import akka.stream.scaladsl.Source import akka.stream.scaladsl.Sink import scala.collection.immutable val values: immutable.Seq[(String, Int)] = immutable.Seq("eins" -> 1, "zwei" -> 2, "drei" -> 3) val source = Source(values).asSourceWithContext(_._2).map(_._1) val result = source.via(mapped).runWith(Sink.seq) result.futureValue should contain theSameElementsInOrderAs immutable.Seq("snie" -> 1, "iewz" -> 2, "ierd" -> 3)
- Java
-
source
import akka.NotUsed; import akka.japi.Pair; import akka.stream.javadsl.*; // a regular flow with pairs as elements Flow<Pair<String, Integer>, Pair<String, Integer>, NotUsed> flow = // ... // Declare the "flow with context" // ingoing: String and Integer // outgoing: String and Integer FlowWithContext<String, Integer, String, Integer, NotUsed> flowWithContext = // convert the flow of pairs into a "flow with context" flow.<String, Integer, Integer>asFlowWithContext( // at the end of this flow: put the elements and the context back into a pair Pair::create, // pick the second element of the incoming pair as context Pair::second) // keep the first pair element as stream element .map(Pair::first); FlowWithContext<String, Integer, String, Integer, NotUsed> mapped = flowWithContext // regular operators apply to the element without seeing the context .map(s -> s.replace('e', 'y')); // running the flow with some sample data and asserting the outcome Collection<Pair<String, Integer>> values = Arrays.asList(Pair.create("eins", 1), Pair.create("zwei", 2), Pair.create("drei", 3)); SourceWithContext<String, Integer, NotUsed> source = Source.from(values).asSourceWithContext(Pair::second).map(Pair::first); CompletionStage<List<Pair<String, Integer>>> result = source.via(mapped).runWith(Sink.seq(), system); List<Pair<String, Integer>> list = result.toCompletableFuture().get(1, TimeUnit.SECONDS); assertThat( list, hasItems(Pair.create("yins", 1), Pair.create("zwyi", 2), Pair.create("dryi", 3)));