RecordIO Framing
The codec parses a ByteString stream in the RecordIO format used by Apache Mesos into distinct frames.
For instance, the response body:
128\n
{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}20\n
{"type":"HEARTBEAT"}
is parsed into frames:
{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}
{"type":"HEARTBEAT"}
Project Info: Alpakka Simple Codecs (RecordIO) | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-simple-codecs
8.0.0
|
JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 |
Scala versions | 2.13.12, 3.3.3 |
JPMS module name | akka.stream.alpakka.simplecodecs |
License | |
Readiness level |
Since 0.5, 2017-01-13
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.9.3" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-simple-codecs" % "8.0.0", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
<properties> <akka.version>2.9.3</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-simple-codecs_${scala.binary.version}</artifactId> <version>8.0.0</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ AkkaVersion: "2.9.3", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-simple-codecs_${versions.ScalaBinary}:8.0.0" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-stream_2.13 2.9.3 org.scala-lang scala-library 2.13.12 - Dependency tree
com.typesafe.akka akka-stream_2.13 2.9.3 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.9.3 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.2 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.9.3 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0
Usage
The flow factory RecordIOFraming
RecordIOFraming
provides a scanner
factory method for a Flow[ByteString, ByteString, _]
Flow<ByteString, ByteString, ?>
which parses out RecordIO frames.
- Scala
-
source
import akka.stream.alpakka.recordio.scaladsl.RecordIOFraming val FirstRecordData = """{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}""" val SecondRecordData = """{"type":"HEARTBEAT"}""" val FirstRecordWithPrefix = s"121\n$FirstRecordData" val SecondRecordWithPrefix = s"20\n$SecondRecordData" val basicSource: Source[ByteString, NotUsed] = Source.single(ByteString(FirstRecordWithPrefix + SecondRecordWithPrefix)) val result: Future[Seq[ByteString]] = basicSource .via(RecordIOFraming.scanner()) .runWith(Sink.seq)
- Java
-
source
String firstRecordData = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}"; String secondRecordData = "{\"type\":\"HEARTBEAT\"}"; String firstRecordWithPrefix = "121\n" + firstRecordData; String secondRecordWithPrefix = "20\n" + secondRecordData; Source<ByteString, NotUsed> basicSource = Source.single(ByteString.fromString(firstRecordWithPrefix + secondRecordWithPrefix)); CompletionStage<List<ByteString>> result = basicSource.via(RecordIOFraming.scanner()).runWith(Sink.seq(), system);
We obtain:
- Scala
-
source
val byteStrings = result.futureValue byteStrings(0) shouldBe ByteString(FirstRecordData) byteStrings(1) shouldBe ByteString(SecondRecordData)
- Java
-
source
List<ByteString> byteStrings = result.toCompletableFuture().get(1, TimeUnit.SECONDS); assertThat(byteStrings.get(0), is(ByteString.fromString(firstRecordData))); assertThat(byteStrings.get(1), is(ByteString.fromString(secondRecordData)));
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
- Scala
-
sbt > simpleCodecs/testOnly *.RecordIOFramingSpec