Pravega

Pravega provides a new storage abstraction - a stream - for continuous and unbounded data. A Pravega stream is an elastic set of durable and append-only segments, each segment being an unbounded sequence of bytes. Streams provide exactly-once semantics, and atomicity for groups of events using transactions.

Project Info: Alpakka Pravega
Artifact
com.lightbend.akka
akka-stream-alpakka-pravega
2.0.2
JDK versions
Adopt OpenJDK 8
Adopt OpenJDK 11
Scala versions2.12.11, 2.13.3
JPMS module nameakka.stream.alpakka.pravega
License
Readiness level
Since 2.0, 2020-02-14
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-pravega" % "2.0.2"
Maven
<properties>
  <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-pravega_${scala.binary.version}</artifactId>
  <version>2.0.2</version>
</dependency>
Gradle
versions += [
  ScalaBinary: "2.12"
]
dependencies {
  compile group: 'com.lightbend.akka', name: "akka-stream-alpakka-pravega_${versions.ScalaBinary}", version: '2.0.2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.typesafe.akkaakka-stream_2.122.5.31
io.pravegapravega-client0.7.1
org.scala-langscala-library2.12.11
Dependency tree
com.typesafe.akka    akka-stream_2.12    2.5.31
    com.typesafe.akka    akka-actor_2.12    2.5.31
        com.typesafe    config    1.3.3
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0
            org.scala-lang    scala-library    2.12.11
        org.scala-lang    scala-library    2.12.11
    com.typesafe.akka    akka-protobuf_2.12    2.5.31
        org.scala-lang    scala-library    2.12.11
    com.typesafe    ssl-config-core_2.12    0.3.8
        com.typesafe    config    1.3.3
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.2
            org.scala-lang    scala-library    2.12.11
        org.scala-lang    scala-library    2.12.11
    org.reactivestreams    reactive-streams    1.0.2
    org.scala-lang    scala-library    2.12.11
io.pravega    pravega-client    0.7.1
    com.google.guava    guava    28.2-jre
        com.google.code.findbugs    jsr305    3.0.2
        com.google.errorprone    error_prone_annotations    2.3.4
        com.google.guava    failureaccess    1.0.1
        com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
        com.google.j2objc    j2objc-annotations    1.3
        org.checkerframework    checker-qual    2.10.0
    io.netty    netty-transport-native-epoll    4.1.30.Final
        io.netty    netty-buffer    4.1.30.Final
            io.netty    netty-common    4.1.30.Final
        io.netty    netty-common    4.1.30.Final
        io.netty    netty-transport-native-unix-common    4.1.30.Final
            io.netty    netty-common    4.1.30.Final
            io.netty    netty-transport    4.1.30.Final
                io.netty    netty-buffer    4.1.30.Final
                    io.netty    netty-common    4.1.30.Final
                io.netty    netty-resolver    4.1.30.Final
                    io.netty    netty-common    4.1.30.Final
        io.netty    netty-transport    4.1.30.Final
            io.netty    netty-buffer    4.1.30.Final
                io.netty    netty-common    4.1.30.Final
            io.netty    netty-resolver    4.1.30.Final
                io.netty    netty-common    4.1.30.Final
    io.pravega    pravega-common    0.7.1
        com.google.guava    guava    28.2-jre
            com.google.code.findbugs    jsr305    3.0.2
            com.google.errorprone    error_prone_annotations    2.3.4
            com.google.guava    failureaccess    1.0.1
            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
            com.google.j2objc    j2objc-annotations    1.3
            org.checkerframework    checker-qual    2.10.0
        commons-io    commons-io    2.6
        org.slf4j    slf4j-api    1.7.30
    io.pravega    pravega-shared-authplugin    0.7.1
        org.slf4j    slf4j-api    1.7.30
    io.pravega    pravega-shared-controller-api    0.7.1
        commons-io    commons-io    2.6
        io.grpc    grpc-auth    1.17.1
            com.google.auth    google-auth-library-credentials    0.9.0
            io.grpc    grpc-core    1.17.1
                com.google.code.findbugs    jsr305    3.0.2
                com.google.code.gson    gson    2.7
                com.google.errorprone    error_prone_annotations    2.3.4
                com.google.guava    guava    28.2-jre
                    com.google.code.findbugs    jsr305    3.0.2
                    com.google.errorprone    error_prone_annotations    2.3.4
                    com.google.guava    failureaccess    1.0.1
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                    com.google.j2objc    j2objc-annotations    1.3
                    org.checkerframework    checker-qual    2.10.0
                io.grpc    grpc-context    1.17.1
                io.opencensus    opencensus-api    0.17.0
                io.opencensus    opencensus-contrib-grpc-metrics    0.17.0
                    io.opencensus    opencensus-api    0.17.0
                org.codehaus.mojo    animal-sniffer-annotations    1.17
        io.grpc    grpc-netty    1.17.1
            io.grpc    grpc-core    1.17.1
                com.google.code.findbugs    jsr305    3.0.2
                com.google.code.gson    gson    2.7
                com.google.errorprone    error_prone_annotations    2.3.4
                com.google.guava    guava    28.2-jre
                    com.google.code.findbugs    jsr305    3.0.2
                    com.google.errorprone    error_prone_annotations    2.3.4
                    com.google.guava    failureaccess    1.0.1
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                    com.google.j2objc    j2objc-annotations    1.3
                    org.checkerframework    checker-qual    2.10.0
                io.grpc    grpc-context    1.17.1
                io.opencensus    opencensus-api    0.17.0
                io.opencensus    opencensus-contrib-grpc-metrics    0.17.0
                    io.opencensus    opencensus-api    0.17.0
                org.codehaus.mojo    animal-sniffer-annotations    1.17
            io.netty    netty-codec-http2    4.1.30.Final
                io.netty    netty-codec-http    4.1.30.Final
                    io.netty    netty-codec    4.1.30.Final
                        io.netty    netty-transport    4.1.30.Final
                            io.netty    netty-buffer    4.1.30.Final
                                io.netty    netty-common    4.1.30.Final
                            io.netty    netty-resolver    4.1.30.Final
                                io.netty    netty-common    4.1.30.Final
                io.netty    netty-handler    4.1.30.Final
                    io.netty    netty-buffer    4.1.30.Final
                        io.netty    netty-common    4.1.30.Final
                    io.netty    netty-codec    4.1.30.Final
                        io.netty    netty-transport    4.1.30.Final
                            io.netty    netty-buffer    4.1.30.Final
                                io.netty    netty-common    4.1.30.Final
                            io.netty    netty-resolver    4.1.30.Final
                                io.netty    netty-common    4.1.30.Final
                    io.netty    netty-transport    4.1.30.Final
                        io.netty    netty-buffer    4.1.30.Final
                            io.netty    netty-common    4.1.30.Final
                        io.netty    netty-resolver    4.1.30.Final
                            io.netty    netty-common    4.1.30.Final
            io.netty    netty-handler-proxy    4.1.30.Final
                io.netty    netty-codec-http    4.1.30.Final
                    io.netty    netty-codec    4.1.30.Final
                        io.netty    netty-transport    4.1.30.Final
                            io.netty    netty-buffer    4.1.30.Final
                                io.netty    netty-common    4.1.30.Final
                            io.netty    netty-resolver    4.1.30.Final
                                io.netty    netty-common    4.1.30.Final
                io.netty    netty-codec-socks    4.1.30.Final
                    io.netty    netty-codec    4.1.30.Final
                        io.netty    netty-transport    4.1.30.Final
                            io.netty    netty-buffer    4.1.30.Final
                                io.netty    netty-common    4.1.30.Final
                            io.netty    netty-resolver    4.1.30.Final
                                io.netty    netty-common    4.1.30.Final
                io.netty    netty-transport    4.1.30.Final
                    io.netty    netty-buffer    4.1.30.Final
                        io.netty    netty-common    4.1.30.Final
                    io.netty    netty-resolver    4.1.30.Final
                        io.netty    netty-common    4.1.30.Final
        io.grpc    grpc-protobuf    1.17.1
            com.google.api.grpc    proto-google-common-protos    1.0.0
            com.google.guava    guava    28.2-jre
                com.google.code.findbugs    jsr305    3.0.2
                com.google.errorprone    error_prone_annotations    2.3.4
                com.google.guava    failureaccess    1.0.1
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                com.google.j2objc    j2objc-annotations    1.3
                org.checkerframework    checker-qual    2.10.0
            com.google.protobuf    protobuf-java    3.5.1
            io.grpc    grpc-core    1.17.1
                com.google.code.findbugs    jsr305    3.0.2
                com.google.code.gson    gson    2.7
                com.google.errorprone    error_prone_annotations    2.3.4
                com.google.guava    guava    28.2-jre
                    com.google.code.findbugs    jsr305    3.0.2
                    com.google.errorprone    error_prone_annotations    2.3.4
                    com.google.guava    failureaccess    1.0.1
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                    com.google.j2objc    j2objc-annotations    1.3
                    org.checkerframework    checker-qual    2.10.0
                io.grpc    grpc-context    1.17.1
                io.opencensus    opencensus-api    0.17.0
                io.opencensus    opencensus-contrib-grpc-metrics    0.17.0
                    io.opencensus    opencensus-api    0.17.0
                org.codehaus.mojo    animal-sniffer-annotations    1.17
            io.grpc    grpc-protobuf-lite    1.17.1
                com.google.guava    guava    28.2-jre
                    com.google.code.findbugs    jsr305    3.0.2
                    com.google.errorprone    error_prone_annotations    2.3.4
                    com.google.guava    failureaccess    1.0.1
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                    com.google.j2objc    j2objc-annotations    1.3
                    org.checkerframework    checker-qual    2.10.0
                io.grpc    grpc-core    1.17.1
                    com.google.code.findbugs    jsr305    3.0.2
                    com.google.code.gson    gson    2.7
                    com.google.errorprone    error_prone_annotations    2.3.4
                    com.google.guava    guava    28.2-jre
                        com.google.code.findbugs    jsr305    3.0.2
                        com.google.errorprone    error_prone_annotations    2.3.4
                        com.google.guava    failureaccess    1.0.1
                        com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                        com.google.j2objc    j2objc-annotations    1.3
                        org.checkerframework    checker-qual    2.10.0
                    io.grpc    grpc-context    1.17.1
                    io.opencensus    opencensus-api    0.17.0
                    io.opencensus    opencensus-contrib-grpc-metrics    0.17.0
                        io.opencensus    opencensus-api    0.17.0
                    org.codehaus.mojo    animal-sniffer-annotations    1.17
        io.grpc    grpc-stub    1.17.1
            io.grpc    grpc-core    1.17.1
                com.google.code.findbugs    jsr305    3.0.2
                com.google.code.gson    gson    2.7
                com.google.errorprone    error_prone_annotations    2.3.4
                com.google.guava    guava    28.2-jre
                    com.google.code.findbugs    jsr305    3.0.2
                    com.google.errorprone    error_prone_annotations    2.3.4
                    com.google.guava    failureaccess    1.0.1
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                    com.google.j2objc    j2objc-annotations    1.3
                    org.checkerframework    checker-qual    2.10.0
                io.grpc    grpc-context    1.17.1
                io.opencensus    opencensus-api    0.17.0
                io.opencensus    opencensus-contrib-grpc-metrics    0.17.0
                    io.opencensus    opencensus-api    0.17.0
                org.codehaus.mojo    animal-sniffer-annotations    1.17
        io.netty    netty-tcnative-boringssl-static    2.0.17.Final
        io.pravega    pravega-common    0.7.1
            com.google.guava    guava    28.2-jre
                com.google.code.findbugs    jsr305    3.0.2
                com.google.errorprone    error_prone_annotations    2.3.4
                com.google.guava    failureaccess    1.0.1
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                com.google.j2objc    j2objc-annotations    1.3
                org.checkerframework    checker-qual    2.10.0
            commons-io    commons-io    2.6
            org.slf4j    slf4j-api    1.7.30
        org.apache.commons    commons-lang3    3.7
        org.slf4j    slf4j-api    1.7.30
    io.pravega    pravega-shared-protocol    0.7.1
        com.google.guava    guava    28.2-jre
            com.google.code.findbugs    jsr305    3.0.2
            com.google.errorprone    error_prone_annotations    2.3.4
            com.google.guava    failureaccess    1.0.1
            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
            com.google.j2objc    j2objc-annotations    1.3
            org.checkerframework    checker-qual    2.10.0
        io.netty    netty-handler    4.1.30.Final
            io.netty    netty-buffer    4.1.30.Final
                io.netty    netty-common    4.1.30.Final
            io.netty    netty-codec    4.1.30.Final
                io.netty    netty-transport    4.1.30.Final
                    io.netty    netty-buffer    4.1.30.Final
                        io.netty    netty-common    4.1.30.Final
                    io.netty    netty-resolver    4.1.30.Final
                        io.netty    netty-common    4.1.30.Final
            io.netty    netty-transport    4.1.30.Final
                io.netty    netty-buffer    4.1.30.Final
                    io.netty    netty-common    4.1.30.Final
                io.netty    netty-resolver    4.1.30.Final
                    io.netty    netty-common    4.1.30.Final
        io.netty    netty-transport    4.1.30.Final
            io.netty    netty-buffer    4.1.30.Final
                io.netty    netty-common    4.1.30.Final
            io.netty    netty-resolver    4.1.30.Final
                io.netty    netty-common    4.1.30.Final
        io.pravega    pravega-common    0.7.1
            com.google.guava    guava    28.2-jre
                com.google.code.findbugs    jsr305    3.0.2
                com.google.errorprone    error_prone_annotations    2.3.4
                com.google.guava    failureaccess    1.0.1
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                com.google.j2objc    j2objc-annotations    1.3
                org.checkerframework    checker-qual    2.10.0
            commons-io    commons-io    2.6
            org.slf4j    slf4j-api    1.7.30
        org.slf4j    slf4j-api    1.7.30
    org.slf4j    slf4j-api    1.7.30
org.scala-lang    scala-library    2.12.11

Concepts

Pravega stores streams of events, and streams are organized using scopes. A Pravega stream comprises a one or more parallel segments, and the set of parallel segments can change over time with auto-scaling. Pravega is designed to operate at scale and is able to accommodate a large number of segments and streams.

Pravega has an API to write and read events. An application looking into ingesting data writes events to a stream, while consuming data consists of reading events from a stream. In addition to the events API, Pravega has other APIs that enable an application to read and write bytes rather than events and to read events of a stream out of order (e.g., when batch processing).

Pravega stores stream data durably, and applications can access the stream data using the same API both when tailing the stream and when processing past data. The system is architected so that the underlying storage is elastic and it is able to accommodate unbounded streams.

When writing an event, Pravega accepts a routing key parameter, and it guarantees order per key even in the presence of auto-scaling.

For more information about Pravega please visit the official documentation.

Configuration

Two categories of properties can/must be provided to configure the connector.

Pravega internals properties that are forwarded to Pravega configuration builders:

Alpakka Connector properties (all others).

reference.conf
akka.alpakka.pravega {
  #
  # ClientConfig (Pravega internals)
  defaults.client-config {
    # ControllerURI The controller rpc URI. This can be of 2 types
    #  1. tcp://ip1:port1,ip2:port2,...
    #   This is used if the controller endpoints are static and can be directly accessed.
    #  2. pravega://ip1:port1,ip2:port2,...
    #   This is used to autodiscovery the controller endpoints from an initial controller list.
    #controller-uri = "tcp://localhost:9090"

    # An optional property representing whether to enable TLS for client's communication with the Controller.
    # If this property and enable-tls-to-segment-store are not set, and the scheme used in controller-uri
    #  is "tls" or "pravegas", TLS is automatically enabled for both client-to-Controller and
    # client-to-Segment Store communications.
    #enable-tls-to-controller = false

    # An optional property representing whether to enable TLS for client's communication with the Controller.
    # If this property and 'enable-tls-to-controller' are not set, and the scheme used in 'controller-uri'
    #   is "tls" or "pravegas", TLS is automatically enabled for both client-to-Controller and
    # client-to-Segment Store communications.
    #enable-tls-to-segment-store = false

    # Maximum number of connections per Segment store to be used by connection pooling.
    #max-connections-per-segment-store=10

    # Path to an optional truststore. If this is null or empty, the default JVM trust store is used.
    # This is currently expected to be a signing certificate for the certification authority.
    #trust-store

    # If the flag 'isEnableTls'  is set, this flag decides whether to enable host name validation or not.
    #validate-host-name=true
  }
  reader {
    client-config = ${akka.alpakka.pravega.defaults.client-config}
    # ReaderConfig (Pravega internals)
    config {
      #disable-time-windows = false
      #initial-allocation-delay = 0
    }
    timeout = 1 second
    #group-name="scala-group-name"
    # The reader-id must be unique across all instances of a reader group.
    # When a reader-id is not provided one is randomly generated each time a Reader Source is created.
    #reader-id="scala-reader-id"
  }
  writer {
    client-config = ${akka.alpakka.pravega.defaults.client-config}
    maximum-inflight-messages = 10
    # EventWriterConfig (Pravega internals)
    config {
      #automatically-note-time=false
      #backoff-multiple=10
      #enable-connection-pooling=false
      #initial-backoff-millis=1
      #retry-attempts=10
      #
      # The transaction timeout parameter corresponds to the lease renewal period.
      # In every period, the client must send at least one ping to keep the txn alive.
      # If the client fails to do so, then Pravega aborts the txn automatically. The client
      # sends pings internally and requires no application intervention, only that it sets
      # this parameter accordingly.
      #
      # This parameter is additionally used to determine the total amount of time that
      # a txn can remain open. Currently, we set the maximum amount of time for a
      # txn to remain open to be the minimum between 1 day and 1,000 times the value
      # of the lease renewal period. The 1,000 is hardcoded and has been chosen arbitrarily
      # to be a large enough value.
      #
      # The maximum allowed lease time by default is 120s, see:
      #
      # io.pravega.controller.util.Config.PROPERTY_TXN_MAX_LEASE
      #
      # The maximum allowed lease time is a configuration parameter of the controller
      # and can be changed accordingly. Note that being a controller-wide parameter,
      # it affects all transactions.
      #transaction-timeout-time=89999L
    }
  }

}

The Pravega connector can automatically configure the Pravega client by supplying Lightbend configuration in an application.conf, or it can be set programmatically with ReaderSettingsBuilderReaderSettingsBuilder or WriterSettingsBuilderWriterSettingsBuilder. See the following sections for examples.

ClientConfig

This configuration holds connection properties (endpoints, protocol) for all communication.

It can be overridden in an application.conf file at the following configuration paths:

  • reader: akka.alpakka.pravega.reader.client-config
  • writer: akka.alpakka.pravega.writer.client-config

It can be customised programmatically, see below.

EventReader configuration

A Pravega Source needs a ReaderSettingsReaderSettings to operate, it can be built from configuration and programmatically customized:

Scala
val readerSettings = ReaderSettingsBuilder(system)
  .clientConfigBuilder(
    _.controllerURI(new URI("pravegas://localhost:9090")) // ClientConfig customization
      .enableTlsToController(true)
      .enableTlsToSegmentStore(true)
  )
  .readerConfigBuilder(_.disableTimeWindows(true)) //ReaderConfig customization
  .withTimeout(3.seconds)
  .withSerializer(new UTF8StringSerializer)
Java
ReaderSettings<String> readerSettings =
    ReaderSettingsBuilder.create(system)
        .clientConfigBuilder(
            builder -> builder.enableTlsToController(true)) // ClientConfig customization
        .readerConfigBuilder(
            builder -> builder.disableTimeWindows(true)) // ReaderConfig customization
        .withTimeout(Duration.ofSeconds(3))
        .withSerializer(new UTF8StringSerializer());

EventWriter configuration

A Pravega Flow or Sink needs a WriterSettingsWriterSettings to operate, it can be built from configuration and programmatically customized:

You may want to use a routing key, you have to provide a key extractor functionkey extractor function for your message type.

Scala
val writerSettinds = WriterSettingsBuilder(system)
  .clientConfigBuilder(_.enableTlsToController(true)) // ClientConfig customization
  .eventWriterConfigBuilder(_.backoffMultiple(5)) //EventWriterConfig customization
  .withMaximumInflightMessages(5)
  .withKeyExtractor((str: String) => str.substring(0, 2))
  .withSerializer(new UTF8StringSerializer)
Java
WriterSettings<String> writerSettings =
    WriterSettingsBuilder.<String>create(system)
        .withKeyExtractor((String str) -> str.substring(0, 1))
        .withSerializer(new UTF8StringSerializer());

ReaderSettingsBuilderReaderSettingsBuilder, ReaderSettingsBuilderReaderSettingsBuilder produce respectively ReaderSettings and ReaderSettings once a Serializer is provided.

Writing to Pravega

Pravega message writes are done through a Flow/Sink like:

Scala
Source(1 to 100).map(i => s"event_$i").runWith(Pravega.sink("an_existing_scope", "an_existing_streamName"))

Source(1 to 100)
  .map { i =>
    val routingKey = i % 10
    s"${routingKey}_event_$i"
  }
  .runWith(Pravega.sink("an_existing_scope", "an_existing_streamName")(writerSettingsWithRoutingKey))
Java
Sink<String, CompletionStage<Done>> sinkWithRouting =
    Pravega.sink("an_existing_scope", "an_existing_scope", writerSettings);

CompletionStage<Done> doneWithRouting =
    Source.from(Arrays.asList("One", "Two", "Three")).runWith(sinkWithRouting, materializer);

Reading from Pravega

Pravega message reads are from a Source:

Scala

val fut = Pravega .source("an_existing_scope", "an_existing_streamName") .to(Sink.foreach { event: PravegaEvent[String] => val message: String = event.message processMessage(message) }) .run()
Java
CompletionStage<Done> fut =
    Pravega.<String>source("an_existing_scope", "an_existing_scope", readerSettings)
        .to(Sink.foreach(e -> processMessage(e.message())))
        .run(materializer);

It produces a stream of PravegaEventPravegaEvent, a thin wrapper which includes some Pravega metadata along with the message.

Support

In addition to our regular Alpakka community support on gitter: akka/akka and Lightbend’s discuss.lightbend.com, you can also visit the #akka-streams-connector channel on the Pravega slack for assistance with Pravega integration itself.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.