JSON

JSON Framing

Use Akka Stream JsonFraming to split a stream of ByteStringByteString elements into ByteString snippets of valid JSON objects.

See JsonFramingJsonFraming

Akka documentation

JsonFraming.objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed]

Returns a Flow that implements a “brace counting” based framing stage for emitting valid JSON chunks.

Typical examples of data that one may want to frame using this stage include:

Very large arrays:

[{"id": 1}, {"id": 2}, [...], {"id": 999}]

Multiple concatenated JSON objects (with, or without commas between them):

{"id": 1}, {"id": 2}, [...], {"id": 999}

The framing works independently of formatting, i.e. it will still emit valid JSON elements even if two elements are separated by multiple newlines or other whitespace characters. And of course is insensitive (and does not impact the emitting frame) to the JSON object’s internal formatting.

Streaming of nested structures

The method above is great for a stream of “flat” JSON objects (an array or just a stream of objects) but doesn’t work for the many use-cases that involve a nested structure. A common example is the response of a database, which might look more like this:

{
  "size": 100,
  "rows": [
    {"id": 1, "doc": {}}
    {"id": 2, "doc": {}}
    ...
  ]
}

The JSON reading module offers a flow, which allows to stream specific parts of that JSON structure. In this particular example, only the rows array is interesting for the application, more specifically even: only the doc inside each element of the array.

Project Info: Alpakka JSON Streaming
Artifact
com.lightbend.akka
akka-stream-alpakka-json-streaming
9.0.0
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12
JPMS module nameakka.stream.alpakka.json.streaming
License
Readiness level
Since 0.19, 2018-05-09
Home pagehttps://doc.akka.io/libraries/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.10.0"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-json-streaming" % "9.0.0",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.10.0</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-json-streaming_${scala.binary.version}</artifactId>
    <version>9.0.0</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.10.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-json-streaming_${versions.ScalaBinary}:9.0.0"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.fasterxml.jackson.corejackson-core2.17.2
com.fasterxml.jackson.corejackson-databind2.17.2
com.github.jsurferjsurfer-jackson1.6.5
com.typesafe.akkaakka-stream_2.132.10.0
org.scala-langscala-library2.13.12
Dependency tree
com.fasterxml.jackson.core    jackson-core    2.17.2    The Apache Software License, Version 2.0
com.fasterxml.jackson.core    jackson-databind    2.17.2    The Apache Software License, Version 2.0
    com.fasterxml.jackson.core    jackson-annotations    2.17.2    The Apache Software License, Version 2.0
    com.fasterxml.jackson.core    jackson-core    2.17.2    The Apache Software License, Version 2.0
com.github.jsurfer    jsurfer-jackson    1.6.5
    com.fasterxml.jackson.core    jackson-databind    2.17.2    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-annotations    2.17.2    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-core    2.17.2    The Apache Software License, Version 2.0
    com.github.jsurfer    jsurfer-core    1.6.5
        org.antlr    antlr4-runtime    4.13.1
com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
        com.typesafe    config    1.4.3    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
org.scala-lang    scala-library    2.13.12    Apache-2.0

Example

To define which parts of the structure you want to stream the module supports JsonPath notation. For example:

  • Stream all elements of the nested rows array: $.rows[*]
  • Stream the value of doc of each element in the array: $.rows[*].doc

To extract the information needed, run a stream through the JsonReader.select flow.

Scala
sourceval results = Source
  .single(ByteString.fromString(baseDocument))
  .via(JsonReader.select("$.rows[*].doc"))
  .runWith(Sink.seq)
Java
sourcefinal CompletionStage<List<ByteString>> resultStage =
    Source.single(doc).via(JsonReader.select("$.rows[*].doc")).runWith(Sink.seq(), system);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.