RecordIO Framing

The codec parses a ByteString stream in the RecordIO format used by Apache Mesos into distinct frames.

For instance, the response body:

128\n
{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}20\n
{"type":"HEARTBEAT"}

is parsed into frames:

{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}
{"type":"HEARTBEAT"}
Project Info: Alpakka Simple Codecs (RecordIO)
Artifact
com.lightbend.akka
akka-stream-alpakka-simple-codecs
1.0-M2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12
JPMS module nameakka.stream.alpakka.simplecodecs
License
Readiness level
Community-driven
Since 0.5, 2017-01-13
Home pagehttps://doc.akka.io/docs/alpakka/current/
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-simple-codecs" % "1.0-M2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-simple-codecs_2.12</artifactId>
  <version>1.0-M2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-simple-codecs_2.12', version: '1.0-M2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.typesafe.akkaakka-stream_2.122.5.19Apache License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.typesafe.akka    akka-stream_2.12    2.5.19    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.19    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.19    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.6    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Usage

The flow factory RecordIOFraming (APIAPI) provides a scanner factory method for a Flow[ByteString, ByteString, _]Flow<ByteString, ByteString, ?> which parses out RecordIO frames.

Scala
import akka.stream.alpakka.recordio.scaladsl.RecordIOFraming

val FirstRecordData =
  """{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}"""
val SecondRecordData = """{"type":"HEARTBEAT"}"""

val FirstRecordWithPrefix = s"121\n$FirstRecordData"
val SecondRecordWithPrefix = s"20\n$SecondRecordData"

val basicSource: Source[ByteString, NotUsed] =
  Source.single(ByteString(FirstRecordWithPrefix + SecondRecordWithPrefix))

  val result: Future[Seq[ByteString]] = basicSource
    .via(RecordIOFraming.scanner())
    .runWith(Sink.seq)
Java
String firstRecordData =
    "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
String secondRecordData = "{\"type\":\"HEARTBEAT\"}";

String firstRecordWithPrefix = "121\n" + firstRecordData;
String secondRecordWithPrefix = "20\n" + secondRecordData;

Source<ByteString, NotUsed> basicSource =
    Source.single(ByteString.fromString(firstRecordWithPrefix + secondRecordWithPrefix));

CompletionStage<List<ByteString>> result =
    basicSource.via(RecordIOFraming.scanner()).runWith(Sink.seq(), materializer);

We obtain:

Scala
val byteStrings = result.futureValue

byteStrings(0) shouldBe ByteString(FirstRecordData)
byteStrings(1) shouldBe ByteString(SecondRecordData)
Java
List<ByteString> byteStrings = result.toCompletableFuture().get(1, TimeUnit.SECONDS);

assertThat(byteStrings.get(0), is(ByteString.fromString(firstRecordData)));
assertThat(byteStrings.get(1), is(ByteString.fromString(secondRecordData)));

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> simpleCodecs/testOnly *.RecordIOFramingSpec
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.