RecordIO Framing

The codec parses a ByteString stream in the RecordIO format used by Apache Mesos into distinct frames.

For instance, the response body:

128\n
{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}20\n
{"type":"HEARTBEAT"}

is parsed into frames:

{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}
{"type":"HEARTBEAT"}
Project Info: Alpakka Simple Codecs (RecordIO)
Artifact
com.lightbend.akka
akka-stream-alpakka-simple-codecs
2.0.2
JDK versions
Adopt OpenJDK 8
Adopt OpenJDK 11
Scala versions2.12.11, 2.11.12, 2.13.3
JPMS module nameakka.stream.alpakka.simplecodecs
License
Readiness level
Since 0.5, 2017-01-13
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
val AkkaVersion = "2.5.31"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-simple-codecs" % "2.0.2",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.5.31</akka.version>
  <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-simple-codecs_${scala.binary.version}</artifactId>
  <version>2.0.2</version>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream_${scala.binary.version}</artifactId>
  <version>${akka.version}</version>
</dependency>
Gradle
versions += [
  AkkaVersion: "2.5.31",
  ScalaBinary: "2.12"
]
dependencies {
  compile group: 'com.lightbend.akka', name: "akka-stream-alpakka-simple-codecs_${versions.ScalaBinary}", version: '2.0.2',
  compile group: 'com.typesafe.akka', name: "akka-stream_${versions.ScalaBinary}", version: versions.AkkaVersion
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.typesafe.akkaakka-stream_2.122.5.31
org.scala-langscala-library2.12.11
Dependency tree
com.typesafe.akka    akka-stream_2.12    2.5.31
    com.typesafe.akka    akka-actor_2.12    2.5.31
        com.typesafe    config    1.3.3
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0
            org.scala-lang    scala-library    2.12.11
        org.scala-lang    scala-library    2.12.11
    com.typesafe.akka    akka-protobuf_2.12    2.5.31
        org.scala-lang    scala-library    2.12.11
    com.typesafe    ssl-config-core_2.12    0.3.8
        com.typesafe    config    1.3.3
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.2
            org.scala-lang    scala-library    2.12.11
        org.scala-lang    scala-library    2.12.11
    org.reactivestreams    reactive-streams    1.0.2
    org.scala-lang    scala-library    2.12.11
org.scala-lang    scala-library    2.12.11

Usage

The flow factory RecordIOFramingRecordIOFraming provides a scanner factory method for a Flow[ByteString, ByteString, _]Flow<ByteString, ByteString, ?> which parses out RecordIO frames.

Scala
import akka.stream.alpakka.recordio.scaladsl.RecordIOFraming

val FirstRecordData =
  """{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}"""
val SecondRecordData = """{"type":"HEARTBEAT"}"""

val FirstRecordWithPrefix = s"121\n$FirstRecordData"
val SecondRecordWithPrefix = s"20\n$SecondRecordData"

val basicSource: Source[ByteString, NotUsed] =
  Source.single(ByteString(FirstRecordWithPrefix + SecondRecordWithPrefix))

  val result: Future[Seq[ByteString]] = basicSource
    .via(RecordIOFraming.scanner())
    .runWith(Sink.seq)
Java
String firstRecordData =
    "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
String secondRecordData = "{\"type\":\"HEARTBEAT\"}";

String firstRecordWithPrefix = "121\n" + firstRecordData;
String secondRecordWithPrefix = "20\n" + secondRecordData;

Source<ByteString, NotUsed> basicSource =
    Source.single(ByteString.fromString(firstRecordWithPrefix + secondRecordWithPrefix));

CompletionStage<List<ByteString>> result =
    basicSource.via(RecordIOFraming.scanner()).runWith(Sink.seq(), materializer);

We obtain:

Scala
val byteStrings = result.futureValue

byteStrings(0) shouldBe ByteString(FirstRecordData)
byteStrings(1) shouldBe ByteString(SecondRecordData)
Java
List<ByteString> byteStrings = result.toCompletableFuture().get(1, TimeUnit.SECONDS);

assertThat(byteStrings.get(0), is(ByteString.fromString(firstRecordData)));
assertThat(byteStrings.get(1), is(ByteString.fromString(secondRecordData)));

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> simpleCodecs/testOnly *.RecordIOFramingSpec
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.