Source.asSourceWithContext

Extracts context data from the elements of a Source so that it can be turned into a SourceWithContext which can propagate that context per element along a stream.

Source operators

Signature

Source.asSourceWithContextSource.asSourceWithContext

Description

See Context Propagation for a general overview of context propagation.

Extracts context data from the elements of a SourceSource so that it can be turned into a SourceWithContextSourceWithContext which can propagate that context per element along a stream. The function passed into asSourceWithContext must turn elements into contexts, one context for every element.

See also:

Example

Elements from this source have a correlation number, but the flow structure should focus on the text message in the elements. asSourceWithContext chooses the second value in the tuplepair as the context. Another map operator makes the first value the stream elements in the SourceWithContext.

Scala
sourceimport akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.SourceWithContext
import scala.collection.immutable

// values with their contexts as tuples
val values: immutable.Seq[(String, Int)] = immutable.Seq("eins" -> 1, "zwei" -> 2, "drei" -> 3)

// a regular source with the tuples as elements
val source: Source[(String, Int), NotUsed] = Source(values)

// split the tuple into stream elements and their context
val sourceWithContext: SourceWithContext[String, Int, NotUsed] =
  source
    .asSourceWithContext(_._2) // pick the second tuple element as context
    .map(_._1) // keep the first tuple element as stream element

val mapped: SourceWithContext[String, Int, NotUsed] = sourceWithContext
// regular operators apply to the element without seeing the context
  .map(s => s.reverse)

// running the source and asserting the outcome
import akka.stream.scaladsl.Sink
val result = mapped.runWith(Sink.seq)
result.futureValue should contain theSameElementsInOrderAs immutable.Seq("snie" -> 1, "iewz" -> 2, "ierd" -> 3)
Java
sourceimport akka.NotUsed;
import akka.japi.Pair;
import akka.stream.javadsl.*;

// values with their contexts as pairs
Collection<Pair<String, Integer>> values =
    Arrays.asList(Pair.create("eins", 1), Pair.create("zwei", 2), Pair.create("drei", 3));

// a regular source with pairs as elements
Source<Pair<String, Integer>, NotUsed> source = Source.from(values);

// split the pair into stream elements and their context
SourceWithContext<String, Integer, NotUsed> sourceWithContext =
    source
        .asSourceWithContext(Pair::second) // pick the second pair element as context
        .map(Pair::first); // keep the first pair element as stream element

SourceWithContext<String, Integer, NotUsed> mapped =
    sourceWithContext
        // regular operators apply to the element without seeing the context
        .map(s -> s.replace('e', 'y'));

// running the source and asserting the outcome
CompletionStage<List<Pair<String, Integer>>> result = mapped.runWith(Sink.seq(), system);
List<Pair<String, Integer>> list = result.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertThat(
    list, hasItems(Pair.create("yins", 1), Pair.create("zwyi", 2), Pair.create("dryi", 3)));
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.