Pravega
Pravega provides a new storage abstraction - a stream - for continuous and unbounded data. A Pravega stream is an elastic set of durable and append-only segments, each segment being an unbounded sequence of bytes. Streams provide exactly-once semantics, and atomicity for groups of events using transactions.
Project Info: Alpakka Pravega | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-pravega
2.0.2
|
JDK versions | Adopt OpenJDK 8 Adopt OpenJDK 11 |
Scala versions | 2.12.11, 2.13.3 |
JPMS module name | akka.stream.alpakka.pravega |
License | |
Readiness level |
Since 2.0, 2020-02-14
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-pravega" % "2.0.2"
- Maven
<properties> <scala.binary.version>2.12</scala.binary.version> </properties> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-pravega_${scala.binary.version}</artifactId> <version>2.0.2</version> </dependency>
- Gradle
versions += [ ScalaBinary: "2.12" ] dependencies { compile group: 'com.lightbend.akka', name: "akka-stream-alpakka-pravega_${versions.ScalaBinary}", version: '2.0.2' }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-stream_2.12 2.5.31 io.pravega pravega-client 0.7.1 org.scala-lang scala-library 2.12.11 - Dependency tree
com.typesafe.akka akka-stream_2.12 2.5.31 com.typesafe.akka akka-actor_2.12 2.5.31 com.typesafe config 1.3.3 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 com.typesafe.akka akka-protobuf_2.12 2.5.31 org.scala-lang scala-library 2.12.11 com.typesafe ssl-config-core_2.12 0.3.8 com.typesafe config 1.3.3 org.scala-lang.modules scala-parser-combinators_2.12 1.1.2 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 org.reactivestreams reactive-streams 1.0.2 org.scala-lang scala-library 2.12.11 io.pravega pravega-client 0.7.1 com.google.guava guava 28.2-jre com.google.code.findbugs jsr305 3.0.2 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 org.checkerframework checker-qual 2.10.0 io.netty netty-transport-native-epoll 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-transport-native-unix-common 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-transport 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-resolver 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-transport 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-resolver 4.1.30.Final io.netty netty-common 4.1.30.Final io.pravega pravega-common 0.7.1 com.google.guava guava 28.2-jre com.google.code.findbugs jsr305 3.0.2 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 org.checkerframework checker-qual 2.10.0 commons-io commons-io 2.6 org.slf4j slf4j-api 1.7.30 io.pravega pravega-shared-authplugin 0.7.1 org.slf4j slf4j-api 1.7.30 io.pravega pravega-shared-controller-api 0.7.1 commons-io commons-io 2.6 io.grpc grpc-auth 1.17.1 com.google.auth google-auth-library-credentials 0.9.0 io.grpc grpc-core 1.17.1 com.google.code.findbugs jsr305 3.0.2 com.google.code.gson gson 2.7 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava guava 28.2-jre com.google.code.findbugs jsr305 3.0.2 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 org.checkerframework checker-qual 2.10.0 io.grpc grpc-context 1.17.1 io.opencensus opencensus-api 0.17.0 io.opencensus opencensus-contrib-grpc-metrics 0.17.0 io.opencensus opencensus-api 0.17.0 org.codehaus.mojo animal-sniffer-annotations 1.17 io.grpc grpc-netty 1.17.1 io.grpc grpc-core 1.17.1 com.google.code.findbugs jsr305 3.0.2 com.google.code.gson gson 2.7 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava guava 28.2-jre com.google.code.findbugs jsr305 3.0.2 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 org.checkerframework checker-qual 2.10.0 io.grpc grpc-context 1.17.1 io.opencensus opencensus-api 0.17.0 io.opencensus opencensus-contrib-grpc-metrics 0.17.0 io.opencensus opencensus-api 0.17.0 org.codehaus.mojo animal-sniffer-annotations 1.17 io.netty netty-codec-http2 4.1.30.Final io.netty netty-codec-http 4.1.30.Final io.netty netty-codec 4.1.30.Final io.netty netty-transport 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-resolver 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-handler 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-codec 4.1.30.Final io.netty netty-transport 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-resolver 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-transport 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-resolver 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-handler-proxy 4.1.30.Final io.netty netty-codec-http 4.1.30.Final io.netty netty-codec 4.1.30.Final io.netty netty-transport 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-resolver 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-codec-socks 4.1.30.Final io.netty netty-codec 4.1.30.Final io.netty netty-transport 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-resolver 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-transport 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-resolver 4.1.30.Final io.netty netty-common 4.1.30.Final io.grpc grpc-protobuf 1.17.1 com.google.api.grpc proto-google-common-protos 1.0.0 com.google.guava guava 28.2-jre com.google.code.findbugs jsr305 3.0.2 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 org.checkerframework checker-qual 2.10.0 com.google.protobuf protobuf-java 3.5.1 io.grpc grpc-core 1.17.1 com.google.code.findbugs jsr305 3.0.2 com.google.code.gson gson 2.7 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava guava 28.2-jre com.google.code.findbugs jsr305 3.0.2 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 org.checkerframework checker-qual 2.10.0 io.grpc grpc-context 1.17.1 io.opencensus opencensus-api 0.17.0 io.opencensus opencensus-contrib-grpc-metrics 0.17.0 io.opencensus opencensus-api 0.17.0 org.codehaus.mojo animal-sniffer-annotations 1.17 io.grpc grpc-protobuf-lite 1.17.1 com.google.guava guava 28.2-jre com.google.code.findbugs jsr305 3.0.2 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 org.checkerframework checker-qual 2.10.0 io.grpc grpc-core 1.17.1 com.google.code.findbugs jsr305 3.0.2 com.google.code.gson gson 2.7 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava guava 28.2-jre com.google.code.findbugs jsr305 3.0.2 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 org.checkerframework checker-qual 2.10.0 io.grpc grpc-context 1.17.1 io.opencensus opencensus-api 0.17.0 io.opencensus opencensus-contrib-grpc-metrics 0.17.0 io.opencensus opencensus-api 0.17.0 org.codehaus.mojo animal-sniffer-annotations 1.17 io.grpc grpc-stub 1.17.1 io.grpc grpc-core 1.17.1 com.google.code.findbugs jsr305 3.0.2 com.google.code.gson gson 2.7 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava guava 28.2-jre com.google.code.findbugs jsr305 3.0.2 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 org.checkerframework checker-qual 2.10.0 io.grpc grpc-context 1.17.1 io.opencensus opencensus-api 0.17.0 io.opencensus opencensus-contrib-grpc-metrics 0.17.0 io.opencensus opencensus-api 0.17.0 org.codehaus.mojo animal-sniffer-annotations 1.17 io.netty netty-tcnative-boringssl-static 2.0.17.Final io.pravega pravega-common 0.7.1 com.google.guava guava 28.2-jre com.google.code.findbugs jsr305 3.0.2 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 org.checkerframework checker-qual 2.10.0 commons-io commons-io 2.6 org.slf4j slf4j-api 1.7.30 org.apache.commons commons-lang3 3.7 org.slf4j slf4j-api 1.7.30 io.pravega pravega-shared-protocol 0.7.1 com.google.guava guava 28.2-jre com.google.code.findbugs jsr305 3.0.2 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 org.checkerframework checker-qual 2.10.0 io.netty netty-handler 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-codec 4.1.30.Final io.netty netty-transport 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-resolver 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-transport 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-resolver 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-transport 4.1.30.Final io.netty netty-buffer 4.1.30.Final io.netty netty-common 4.1.30.Final io.netty netty-resolver 4.1.30.Final io.netty netty-common 4.1.30.Final io.pravega pravega-common 0.7.1 com.google.guava guava 28.2-jre com.google.code.findbugs jsr305 3.0.2 com.google.errorprone error_prone_annotations 2.3.4 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 org.checkerframework checker-qual 2.10.0 commons-io commons-io 2.6 org.slf4j slf4j-api 1.7.30 org.slf4j slf4j-api 1.7.30 org.slf4j slf4j-api 1.7.30 org.scala-lang scala-library 2.12.11
Concepts
Pravega stores streams of events, and streams are organized using scopes. A Pravega stream comprises a one or more parallel segments, and the set of parallel segments can change over time with auto-scaling. Pravega is designed to operate at scale and is able to accommodate a large number of segments and streams.
Pravega has an API to write and read events. An application looking into ingesting data writes events to a stream, while consuming data consists of reading events from a stream. In addition to the events API, Pravega has other APIs that enable an application to read and write bytes rather than events and to read events of a stream out of order (e.g., when batch processing).
Pravega stores stream data durably, and applications can access the stream data using the same API both when tailing the stream and when processing past data. The system is architected so that the underlying storage is elastic and it is able to accommodate unbounded streams.
When writing an event, Pravega accepts a routing key parameter, and it guarantees order per key even in the presence of auto-scaling.
For more information about Pravega please visit the official documentation.
Configuration
Two categories of properties can/must be provided to configure the connector.
Pravega internals properties that are forwarded to Pravega configuration builders:
ClientConfig
akka.alpakka.pravega.defaults.client-config
EventWriterConfig
akka.alpakka.pravega.writer.config
ReaderConfig
akka.alpakka.pravega.reader.config
Alpakka Connector properties (all others).
- reference.conf
-
akka.alpakka.pravega { # # ClientConfig (Pravega internals) defaults.client-config { # ControllerURI The controller rpc URI. This can be of 2 types # 1. tcp://ip1:port1,ip2:port2,... # This is used if the controller endpoints are static and can be directly accessed. # 2. pravega://ip1:port1,ip2:port2,... # This is used to autodiscovery the controller endpoints from an initial controller list. #controller-uri = "tcp://localhost:9090" # An optional property representing whether to enable TLS for client's communication with the Controller. # If this property and enable-tls-to-segment-store are not set, and the scheme used in controller-uri # is "tls" or "pravegas", TLS is automatically enabled for both client-to-Controller and # client-to-Segment Store communications. #enable-tls-to-controller = false # An optional property representing whether to enable TLS for client's communication with the Controller. # If this property and 'enable-tls-to-controller' are not set, and the scheme used in 'controller-uri' # is "tls" or "pravegas", TLS is automatically enabled for both client-to-Controller and # client-to-Segment Store communications. #enable-tls-to-segment-store = false # Maximum number of connections per Segment store to be used by connection pooling. #max-connections-per-segment-store=10 # Path to an optional truststore. If this is null or empty, the default JVM trust store is used. # This is currently expected to be a signing certificate for the certification authority. #trust-store # If the flag 'isEnableTls' is set, this flag decides whether to enable host name validation or not. #validate-host-name=true } reader { client-config = ${akka.alpakka.pravega.defaults.client-config} # ReaderConfig (Pravega internals) config { #disable-time-windows = false #initial-allocation-delay = 0 } timeout = 1 second #group-name="scala-group-name" # The reader-id must be unique across all instances of a reader group. # When a reader-id is not provided one is randomly generated each time a Reader Source is created. #reader-id="scala-reader-id" } writer { client-config = ${akka.alpakka.pravega.defaults.client-config} maximum-inflight-messages = 10 # EventWriterConfig (Pravega internals) config { #automatically-note-time=false #backoff-multiple=10 #enable-connection-pooling=false #initial-backoff-millis=1 #retry-attempts=10 # # The transaction timeout parameter corresponds to the lease renewal period. # In every period, the client must send at least one ping to keep the txn alive. # If the client fails to do so, then Pravega aborts the txn automatically. The client # sends pings internally and requires no application intervention, only that it sets # this parameter accordingly. # # This parameter is additionally used to determine the total amount of time that # a txn can remain open. Currently, we set the maximum amount of time for a # txn to remain open to be the minimum between 1 day and 1,000 times the value # of the lease renewal period. The 1,000 is hardcoded and has been chosen arbitrarily # to be a large enough value. # # The maximum allowed lease time by default is 120s, see: # # io.pravega.controller.util.Config.PROPERTY_TXN_MAX_LEASE # # The maximum allowed lease time is a configuration parameter of the controller # and can be changed accordingly. Note that being a controller-wide parameter, # it affects all transactions. #transaction-timeout-time=89999L } } }
The Pravega connector can automatically configure the Pravega client by supplying Lightbend configuration in an application.conf, or it can be set programmatically with ReaderSettingsBuilder
ReaderSettingsBuilder
or WriterSettingsBuilder
WriterSettingsBuilder
. See the following sections for examples.
ClientConfig
This configuration holds connection properties (endpoints, protocol) for all communication.
It can be overridden in an application.conf
file at the following configuration paths:
- reader:
akka.alpakka.pravega.reader.client-config
- writer:
akka.alpakka.pravega.writer.client-config
It can be customised programmatically, see below.
EventReader configuration
A Pravega Source needs a ReaderSettings
ReaderSettings
to operate, it can be built from configuration and programmatically customized:
- Scala
-
val readerSettings = ReaderSettingsBuilder(system) .clientConfigBuilder( _.controllerURI(new URI("pravegas://localhost:9090")) // ClientConfig customization .enableTlsToController(true) .enableTlsToSegmentStore(true) ) .readerConfigBuilder(_.disableTimeWindows(true)) //ReaderConfig customization .withTimeout(3.seconds) .withSerializer(new UTF8StringSerializer)
- Java
-
ReaderSettings<String> readerSettings = ReaderSettingsBuilder.create(system) .clientConfigBuilder( builder -> builder.enableTlsToController(true)) // ClientConfig customization .readerConfigBuilder( builder -> builder.disableTimeWindows(true)) // ReaderConfig customization .withTimeout(Duration.ofSeconds(3)) .withSerializer(new UTF8StringSerializer());
EventWriter configuration
A Pravega Flow or Sink needs a WriterSettings
WriterSettings
to operate, it can be built from configuration and programmatically customized:
You may want to use a routing key, you have to provide a key extractor function
key extractor function
for your message type.
- Scala
-
val writerSettinds = WriterSettingsBuilder(system) .clientConfigBuilder(_.enableTlsToController(true)) // ClientConfig customization .eventWriterConfigBuilder(_.backoffMultiple(5)) //EventWriterConfig customization .withMaximumInflightMessages(5) .withKeyExtractor((str: String) => str.substring(0, 2)) .withSerializer(new UTF8StringSerializer)
- Java
-
WriterSettings<String> writerSettings = WriterSettingsBuilder.<String>create(system) .withKeyExtractor((String str) -> str.substring(0, 1)) .withSerializer(new UTF8StringSerializer());
ReaderSettingsBuilder
ReaderSettingsBuilder
, ReaderSettingsBuilder
ReaderSettingsBuilder
produce respectively ReaderSettings and ReaderSettings once a Serializer
is provided.
Writing to Pravega
Pravega message writes are done through a Flow/Sink like:
- Scala
-
Source(1 to 100).map(i => s"event_$i").runWith(Pravega.sink("an_existing_scope", "an_existing_streamName")) Source(1 to 100) .map { i => val routingKey = i % 10 s"${routingKey}_event_$i" } .runWith(Pravega.sink("an_existing_scope", "an_existing_streamName")(writerSettingsWithRoutingKey))
- Java
-
Sink<String, CompletionStage<Done>> sinkWithRouting = Pravega.sink("an_existing_scope", "an_existing_scope", writerSettings); CompletionStage<Done> doneWithRouting = Source.from(Arrays.asList("One", "Two", "Three")).runWith(sinkWithRouting, materializer);
Reading from Pravega
Pravega message reads are from a Source:
- Scala
-
val fut = Pravega .source("an_existing_scope", "an_existing_streamName") .to(Sink.foreach { event: PravegaEvent[String] => val message: String = event.message processMessage(message) }) .run() - Java
-
CompletionStage<Done> fut = Pravega.<String>source("an_existing_scope", "an_existing_scope", readerSettings) .to(Sink.foreach(e -> processMessage(e.message()))) .run(materializer);
It produces a stream of PravegaEvent
PravegaEvent
, a thin wrapper which includes some Pravega metadata along with the message.
Support
In addition to our regular Alpakka community support on and Lightbend’s discuss.lightbend.com, you can also visit the #akka-streams-connector
channel on the Pravega slack for assistance with Pravega integration itself.