Integration Patterns

Many Enterprise Integration Patterns can be implemented with Akka Streams (see Akka Streams documentation).

Splitter

You can achieve a Splitter as described in EIP using out of the box Akka Streams dsl.

Splitter

Simple Splitter

Let’s say that we have a stream containing strings. Each string contains a few numbers separated by “-”. We want to create out of this a stream that only contains the numbers.

Scala
//Sample Source
val source: Source[String, NotUsed] = Source(List("1-2-3", "2-3", "3-4"))

val ret = source
  .map(s => s.split("-").toList)
  .mapConcat(identity)
  //Sub-streams logic
  .map(s => s.toInt)
  .runWith(Sink.seq)

//Verify results

ret.futureValue should be(Vector(1, 2, 3, 2, 3, 3, 4))
Java
// Sample Source
Source<String, NotUsed> source = Source.from(Arrays.asList("1-2-3", "2-3", "3-4"));

CompletionStage<List<Integer>> ret =
    source
        .map(s -> Arrays.asList(s.split("-")))
        .mapConcat(f -> f)
        // Sub-streams logic
        .map(s -> Integer.valueOf(s))
        .runWith(Sink.seq(), materializer);

// Verify results
List<Integer> list = ret.toCompletableFuture().get();
assert list.equals(Arrays.asList(1, 2, 3, 2, 3, 3, 4));

Spliter + Aggregator

Sometimes it’s very useful to split a message and aggregate it’s “sub-messages” into a new message (A combination of Splitter and Aggregator)

Let’s say that now we want to create a new stream containing the sums of the numbers in each original string.

Scala
//Sample Source
val source: Source[String, NotUsed] = Source(List("1-2-3", "2-3", "3-4"))

val result = source
  .map(s => s.split("-").toList)
  //split all messages into sub-streams
  .splitWhen(a => true)
  //now split each collection
  .mapConcat(identity)
  //Sub-streams logic
  .map(s => s.toInt)
  //aggregate each sub-stream
  .reduce((a, b) => a + b)
  //and merge back the result into the original stream
  .mergeSubstreams
  .runWith(Sink.seq);

//Verify results
result.futureValue should be(Vector(6, 5, 7))
Java
// Sample Source
Source<String, NotUsed> source = Source.from(Arrays.asList("1-2-3", "2-3", "3-4"));

CompletionStage<List<Integer>> ret =
    source
        .map(s -> Arrays.asList(s.split("-")))
        // split all messages into sub-streams
        .splitWhen(a -> true)
        // now split each collection
        .mapConcat(f -> f)
        // Sub-streams logic
        .map(s -> Integer.valueOf(s))
        // aggregate each sub-stream
        .reduce((a, b) -> a + b)
        // and merge back the result into the original stream
        .mergeSubstreams()
        .runWith(Sink.seq(), materializer);

// Verify results
List<Integer> list = ret.toCompletableFuture().get();
assert list.equals(Arrays.asList(6, 5, 7));

While in real life this solution if overkill for such a simple problem (you can just do everything in a map), more complex scenarios, involving in particular I/O, will benefit from the fact that you can paralelize sub-streams and get back-pressure for “free”.

PassThrough

Use PassThroughFlow when you have a message that should be used in a flow that transform it but you want to maintain the original message for another following flow. For example when consuming messages from Kafka (CommittableMessage), the message can be used inside a flow (transform it, save it inside a database, …) and then you need again the original message to commit the offset.

It can be used whenever you have 2 flows:

  • Flow1 that takes a message A and returns B
  • Flow2 that takes a message A and return C

If you want to execute first Flow1 and then Flow2 you need a way to maintain/passthrough message A.

                    a=>transform=>a1
                   /                 \
                  /                   \
a=>(a, a)=>unzip -                     zip=>(a1, a)=> a
                  \                   /
                   \                 /
                    --------a--------
Scala
object PassThroughFlow {
  def apply[A, T](processingFlow: Flow[A, T, NotUsed]): Graph[FlowShape[A, (T, A)], NotUsed] =
    apply[A, T, (T, A)](processingFlow, Keep.both)

  def apply[A, T, O](processingFlow: Flow[A, T, NotUsed], output: (T, A) => O): Graph[FlowShape[A, O], NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      {
        import GraphDSL.Implicits._

        val broadcast = builder.add(Broadcast[A](2))
        val zip = builder.add(ZipWith[T, A, O]((left, right) => output(left, right)))

        // format: off
        broadcast.out(0) ~> processingFlow ~> zip.in0
        broadcast.out(1)         ~>           zip.in1
        // format: on

        FlowShape(broadcast.in, zip.out)
      }
    })
}

A sample usage:

Scala
//Sample Source
val source = Source(List(1, 2, 3))

// Pass through this flow maintaining the original message
val passThroughMe =
  Flow[Int]
    .map(_ * 10)

val ret = source
  .via(PassThroughFlow(passThroughMe))
  .runWith(Sink.seq)

//Verify results
ret.futureValue should be(Vector((10, 1), (20, 2), (30, 3)))

Using Keep you can choose what it the return value:

  • PassThroughFlow(passThroughMe, Keep.right): to only output the original message
  • PassThroughFlow(passThroughMe, Keep.both): to output both values as a Tuple
  • Keep.left/Keep.none: are not very useful in this use case, there isn’t a pass-through …

You can also write your own output function to combine in different ways the two outputs.

Scala
//Sample Source
val source = Source(List(1, 2, 3))

// Pass through this flow maintaining the original message
val passThroughMe =
  Flow[Int]
    .map(_ * 10)

val ret = source
  .via(PassThroughFlow(passThroughMe, Keep.right))
  .runWith(Sink.seq)

//Verify results
ret.futureValue should be(Vector(1, 2, 3))

This pattern is useful when integrating Alpakka connectors together. Here an example with Kafka:

Scala
val writeFlow = Flow[ConsumerMessage.CommittableMessage[String, Array[Byte]]].map(_ => ???)

val consumerSettings =
  ConsumerSettings(system, new StringDeserializer, new ByteArrayDeserializer)

Consumer
  .committableSource(consumerSettings, Subscriptions.topics("topic1"))
  .via(PassThroughFlow(writeFlow, Keep.right))
  .map(_.committableOffset)
  .groupedWithin(10, 5.seconds)
  .map(CommittableOffsetBatch(_))
  .mapAsync(3)(_.commitScaladsl())
  .toMat(Sink.ignore)(Keep.both)
  .mapMaterializedValue(DrainingControl.apply)
  .run()
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.