AWS Kinesis
The AWS Kinesis connector provides flows for streaming data to and from Kinesis Data streams and to Kinesis Firehose streams.
For more information about Kinesis please visit the Kinesis documentation.
Another Kinesis connector which is based on the Kinesis Client Library is available.
The KCL Source can read from several shards and rebalance automatically when other Workers are started or stopped. It also handles record sequence checkpoints.
Please read more about it at GitHub aserrallerios/kcl-akka-stream.
Another Kinesis connector which is based on the Kinesis Client Library is available.
This library combines the convenience of Akka Streams with KCL checkpoint management, failover, load-balancing, and re-sharding capabilities.
Please read more about it at GitHub StreetContxt/kcl-akka-stream.
Another Kinesis connector which is based on the Kinesis Client Library 2.x is available.
This library exposes an Akka Streams Source backed by the KCL for checkpoint management, failover, load-balancing, and re-sharding capabilities.
Please read more about it at GitHub 500px/kinesis-stream.
Project Info: Alpakka Kinesis | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-kinesis
1.1.2
|
JDK versions | OpenJDK 8 |
Scala versions | 2.12.7, 2.11.12, 2.13.0 |
JPMS module name | akka.stream.alpakka.aws.kinesis |
License | |
Readiness level |
Since 0.10, 2017-06-30
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-kinesis" % "1.1.2"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-kinesis_2.12</artifactId> <version>1.1.2</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-kinesis_2.12', version: '1.1.2' }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version License com.amazonaws aws-java-sdk-kinesis 1.11.476 Apache License, Version 2.0 com.typesafe.akka akka-stream_2.12 2.5.23 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause - Dependency tree
com.amazonaws aws-java-sdk-kinesis 1.11.476 Apache License, Version 2.0 com.amazonaws aws-java-sdk-core 1.11.476 Apache License, Version 2.0 com.fasterxml.jackson.core jackson-databind 2.6.7.2 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.6.0 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.6.7 The Apache Software License, Version 2.0 com.fasterxml.jackson.dataformat jackson-dataformat-cbor 2.6.7 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.6.7 The Apache Software License, Version 2.0 commons-logging commons-logging 1.1.3 The Apache Software License, Version 2.0 joda-time joda-time 2.8.1 Apache 2 org.apache.httpcomponents httpclient 4.5.5 Apache License, Version 2.0 commons-codec commons-codec 1.10 Apache License, Version 2.0 commons-logging commons-logging 1.1.3 The Apache Software License, Version 2.0 org.apache.httpcomponents httpcore 4.4.9 Apache License, Version 2.0 software.amazon.ion ion-java 1.0.2 The Apache License, Version 2.0 com.amazonaws jmespath-java 1.11.476 Apache License, Version 2.0 com.fasterxml.jackson.core jackson-databind 2.6.7.2 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.6.0 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.6.7 The Apache Software License, Version 2.0 com.typesafe.akka akka-stream_2.12 2.5.23 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.23 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-protobuf_2.12 2.5.23 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause
Kinesis Data Streams
Create the Kinesis client
Sources and Flows provided by this connector need a AmazonKinesisAsync
instance to consume messages from a shard.
The AmazonKinesisAsync
instance you supply is thread-safe and can be shared amongst multiple GraphStages
. As a result, individual GraphStages
will not automatically shutdown the supplied client when they complete. It is recommended to shut the client instance down on Actor system termination.
- Scala
-
implicit val system: ActorSystem = ActorSystem() implicit val materializer: Materializer = ActorMaterializer() implicit val amazonKinesisAsync: com.amazonaws.services.kinesis.AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient() system.registerOnTermination(amazonKinesisAsync.shutdown())
- Java
-
final ActorSystem system = ActorSystem.create(); final ActorMaterializer materializer = ActorMaterializer.create(system); final com.amazonaws.services.kinesis.AmazonKinesisAsync amazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient(); system.registerOnTermination(amazonKinesisAsync::shutdown);
Kinesis as Source
The KinesisSource
creates one GraphStage
per shard. Reading from a shard requires an instance of ShardSettings
.
- Scala
-
val settings = ShardSettings(streamName = "myStreamName", shardId = "shard-id") .withRefreshInterval(1.second) .withLimit(500) .withShardIteratorType(ShardIteratorType.TRIM_HORIZON)
- Java
-
final ShardSettings settings = ShardSettings.create("streamName", "shard-id") .withRefreshInterval(Duration.ofSeconds(1)) .withLimit(500) .withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
You have the choice of reading from a single shard, or reading from multiple shards. In the case of multiple shards the results of running a separate GraphStage
for each shard will be merged together.
The GraphStage
associated with a shard will remain open until the graph is stopped, or a GetRecords result returns an empty shard iterator indicating that the shard has been closed. This means that if you wish to continue processing records after a merge or reshard, you will need to recreate the source with the results of a new DescribeStream request, which can be done by simply creating a new KinesisSource
. You can read more about adapting to a reshard in the AWS documentation.
For a single shard you simply provide the settings for a single shard.
- Scala
-
val source: Source[com.amazonaws.services.kinesis.model.Record, NotUsed] = KinesisSource.basic(settings, amazonKinesisAsync)
- Java
-
final Source<com.amazonaws.services.kinesis.model.Record, NotUsed> source = KinesisSource.basic(settings, amazonKinesisAsync);
You can merge multiple shards by providing a list settings.
- Scala
-
val mergeSettings = List( ShardSettings("myStreamName", "shard-id-1"), ShardSettings("myStreamName", "shard-id-2") ) val mergedSource: Source[Record, NotUsed] = KinesisSource.basicMerge(mergeSettings, amazonKinesisAsync)
- Java
-
final List<ShardSettings> mergeSettings = Arrays.asList( ShardSettings.create("streamName", "shard-id-1"), ShardSettings.create("streamName", "shard-id-2")); final Source<Record, NotUsed> two = KinesisSource.basicMerge(mergeSettings, amazonKinesisAsync);
The constructed Source
will return Record objects by calling GetRecords at the specified interval and according to the downstream demand.
Kinesis Put via Flow or as Sink
The KinesisFlow
(or KinesisSink
) KinesisFlow
(or KinesisSink
) publishes messages into a Kinesis stream using its partition key and message body. It uses dynamic size batches, can perform several requests in parallel and retries failed records. These features are necessary to achieve the best possible write throughput to the stream. The Flow outputs the result of publishing each record.
Batching has a drawback: message order cannot be guaranteed, as some records within a single batch may fail to be published. That also means that the Flow output may not match the same input order.
More information can be found in the AWS documentation and the AWS API reference.
In order to correlate the results with the original message, an optional user context object of arbitrary type can be associated with every message and will be returned with the corresponding result. This allows keeping track of which messages have been successfully sent to Kinesis even if the message order gets mixed up.
Publishing to a Kinesis stream requires an instance of KinesisFlowSettings
, although a default instance with sane values and a method that returns settings based on the stream shard number are also available:
- Scala
-
val flowSettings = KinesisFlowSettings .create() .withParallelism(1) .withMaxBatchSize(500) .withMaxRecordsPerSecond(1000) .withMaxBytesPerSecond(1000000) .withMaxRetries(5) .withBackoffStrategy(KinesisFlowSettings.Exponential) .withRetryInitialTimeout(100.milli) val defaultFlowSettings = KinesisFlowSettings.Defaults val fourShardFlowSettings = KinesisFlowSettings.byNumberOfShards(4)
- Java
-
final KinesisFlowSettings flowSettings = KinesisFlowSettings.create() .withParallelism(1) .withMaxBatchSize(500) .withMaxRecordsPerSecond(1_000) .withMaxBytesPerSecond(1_000_000) .withMaxRecordsPerSecond(5) .withBackoffStrategyExponential() .withRetryInitialTimeout(Duration.ofMillis(100)); final KinesisFlowSettings defaultFlowSettings = KinesisFlowSettings.create(); final KinesisFlowSettings fourShardFlowSettings = KinesisFlowSettings.byNumberOfShards(4);
Note that throughput settings maxRecordsPerSecond
and maxBytesPerSecond
are vital to minimize server errors (like ProvisionedThroughputExceededException
) and retries, and thus achieve a higher publication rate.
The Flow/Sink can now be created.
- Scala
-
val flow1: Flow[PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed] = KinesisFlow("myStreamName") val flow2: Flow[PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed] = KinesisFlow("myStreamName", flowSettings) val flow3: Flow[(PutRecordsRequestEntry, String), (PutRecordsResultEntry, String), NotUsed] = KinesisFlow.withUserContext("myStreamName") val flow4: Flow[(PutRecordsRequestEntry, String), (PutRecordsResultEntry, String), NotUsed] = KinesisFlow.withUserContext("myStreamName", flowSettings) val flow5: Flow[(String, ByteString), PutRecordsResultEntry, NotUsed] = KinesisFlow.byPartitionAndBytes("myStreamName") val flow6: Flow[(String, ByteBuffer), PutRecordsResultEntry, NotUsed] = KinesisFlow.byPartitionAndData("myStreamName") val sink1: Sink[PutRecordsRequestEntry, NotUsed] = KinesisSink("myStreamName") val sink2: Sink[PutRecordsRequestEntry, NotUsed] = KinesisSink("myStreamName", flowSettings) val sink3: Sink[(String, ByteString), NotUsed] = KinesisSink.byPartitionAndBytes("myStreamName") val sink4: Sink[(String, ByteBuffer), NotUsed] = KinesisSink.byPartitionAndData("myStreamName")
- Java
-
final Flow<PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed> flow = KinesisFlow.create("streamName", flowSettings, amazonKinesisAsync); final Flow<PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed> defaultSettingsFlow = KinesisFlow.create("streamName", amazonKinesisAsync); final Flow<Pair<PutRecordsRequestEntry, String>, Pair<PutRecordsResultEntry, String>, NotUsed> flowWithStringContext = KinesisFlow.withUserContext("streamName", flowSettings, amazonKinesisAsync); final Flow<Pair<PutRecordsRequestEntry, String>, Pair<PutRecordsResultEntry, String>, NotUsed> defaultSettingsFlowWithStringContext = KinesisFlow.withUserContext("streamName", flowSettings, amazonKinesisAsync); final Sink<PutRecordsRequestEntry, NotUsed> sink = KinesisSink.create("streamName", flowSettings, amazonKinesisAsync); final Sink<PutRecordsRequestEntry, NotUsed> defaultSettingsSink = KinesisSink.create("streamName", amazonKinesisAsync);
Kinesis Firehose Streams
Create the Kinesis Firehose client
Flows provided by this connector need a AmazonKinesisFirehoseAsync
instance to publish messages.
The AmazonKinesisFirehoseAsync
instance you supply is thread-safe and can be shared amongst multiple GraphStages
. As a result, individual GraphStages
will not automatically shutdown the supplied client when they complete. It is recommended to shut the client instance down on Actor system termination.
- Scala
-
implicit val system: ActorSystem = ActorSystem() implicit val materializer: Materializer = ActorMaterializer() implicit val amazonKinesisFirehoseAsync: com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsync = AmazonKinesisFirehoseAsyncClientBuilder.defaultClient() system.registerOnTermination(amazonKinesisFirehoseAsync.shutdown())
- Java
-
final ActorSystem system = ActorSystem.create(); final ActorMaterializer materializer = ActorMaterializer.create(system); final com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsync amazonKinesisFirehoseAsync = AmazonKinesisFirehoseAsyncClientBuilder.defaultClient(); system.registerOnTermination(amazonKinesisFirehoseAsync::shutdown);
Kinesis Firehose Put via Flow or as Sink
The KinesisFirehoseFlow
(or KinesisFirehoseSink
) KinesisFirehoseFlow
(or KinesisFirehoseSink
) publishes messages into a Kinesis Firehose stream using its message body. It uses dynamic size batches, can perform several requests in parallel and retries failed records. These features are necessary to achieve the best possible write throughput to the stream. The Flow outputs the result of publishing each record.
Batching has a drawback: message order cannot be guaranteed, as some records within a single batch may fail to be published. That also means that the Flow output may not match the same input order.
More information can be found in the AWS API reference.
Publishing to a Kinesis Firehose stream requires an instance of KinesisFirehoseFlowSettings
, although a default instance with sane values is available:
- Scala
-
val flowSettings = KinesisFirehoseFlowSettings .create() .withParallelism(1) .withMaxBatchSize(500) .withMaxRecordsPerSecond(5000) .withMaxBytesPerSecond(4000000) .withMaxRetries(5) .withBackoffStrategy(KinesisFirehoseFlowSettings.Exponential) .withRetryInitialTimeout(100.millis) val defaultFlowSettings = KinesisFirehoseFlowSettings.Defaults
- Java
-
final KinesisFirehoseFlowSettings flowSettings = KinesisFirehoseFlowSettings.create() .withParallelism(1) .withMaxBatchSize(500) .withMaxRecordsPerSecond(1_000) .withMaxBytesPerSecond(1_000_000) .withMaxRecordsPerSecond(5) .withBackoffStrategyExponential() .withRetryInitialTimeout(Duration.ofMillis(100L)); final KinesisFirehoseFlowSettings defaultFlowSettings = KinesisFirehoseFlowSettings.create();
Note that throughput settings maxRecordsPerSecond
and maxBytesPerSecond
are vital to minimize server errors (like ProvisionedThroughputExceededException
) and retries, and thus achieve a higher publication rate.
The Flow/Sink can now be created.
- Scala
-
val flow1: Flow[Record, PutRecordBatchResponseEntry, NotUsed] = KinesisFirehoseFlow("myStreamName") val flow2: Flow[Record, PutRecordBatchResponseEntry, NotUsed] = KinesisFirehoseFlow("myStreamName", flowSettings) val sink1: Sink[Record, NotUsed] = KinesisFirehoseSink("myStreamName") val sink2: Sink[Record, NotUsed] = KinesisFirehoseSink("myStreamName", flowSettings)
- Java
-
final Flow<Record, PutRecordBatchResponseEntry, NotUsed> flow = KinesisFirehoseFlow.apply("streamName", flowSettings, amazonKinesisFirehoseAsync); final Flow<Record, PutRecordBatchResponseEntry, NotUsed> defaultSettingsFlow = KinesisFirehoseFlow.apply("streamName", amazonKinesisFirehoseAsync); final Sink<Record, NotUsed> sink = KinesisFirehoseSink.apply("streamName", flowSettings, amazonKinesisFirehoseAsync); final Sink<Record, NotUsed> defaultSettingsSink = KinesisFirehoseSink.apply("streamName", amazonKinesisFirehoseAsync);