Apache Kudu
The Alpakka Kudu connector supports writing to Apache Kudu tables.
Apache Kudu is a free and open source column-oriented data store in the Apache Hadoop ecosystem.
Project Info: Alpakka Kudu | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-kudu
6.0.0+3-654dc53a-SNAPSHOT
|
JDK versions | Adopt OpenJDK 8 Adopt OpenJDK 11 |
Scala versions | 2.13.10, 2.12.17 |
JPMS module name | akka.stream.alpakka.kudu |
License | |
Readiness level |
Since 0.19, 2018-05-09
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
val AkkaVersion = "2.8.1" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-kudu" % "6.0.0+3-654dc53a-SNAPSHOT", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
<properties> <akka.version>2.8.1</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-kudu_${scala.binary.version}</artifactId> <version>6.0.0+3-654dc53a-SNAPSHOT</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ AkkaVersion: "2.8.1", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-kudu_${versions.ScalaBinary}:6.0.0+3-654dc53a-SNAPSHOT" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-stream_2.13 2.8.1 org.apache.kudu kudu-client-tools 1.7.1 org.scala-lang scala-library 2.13.10 - Dependency tree
com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.apache.kudu kudu-client-tools 1.7.1 org.slf4j slf4j-api 1.7.25 org.scala-lang scala-library 2.13.10 Apache-2.0
Configuration
To connect to Kudu you need:
- Describe the Kudu
Schema
- Define a converter function to map your data type to a
PartialRow
- Specify Kudu
CreateTableOptions
- Set up Alpakka’s
KuduTableSettings
- Scala
-
source
// Kudu Schema val cols = List(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build, new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build) val schema = new Schema(cols.asJava) // Converter function case class Person(id: Int, name: String) val kuduConverter: Person => PartialRow = { person => val partialRow = schema.newPartialRow() partialRow.addInt(0, person.id) partialRow.addString(1, person.name) partialRow } // Kudu table options val rangeKeys = List("key") val createTableOptions = new CreateTableOptions().setNumReplicas(1).setRangePartitionColumns(rangeKeys.asJava) // Alpakka settings val kuduTableSettings = KuduTableSettings("test", schema, createTableOptions, kuduConverter)
- Java
-
source
// Kudu Schema List<ColumnSchema> columns = new ArrayList<>(2); columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build()); schema = new Schema(columns); // Converter function Function<Person, PartialRow> kuduConverter = person -> { PartialRow partialRow = schema.newPartialRow(); partialRow.addInt(0, person.id); partialRow.addString(1, person.name); return partialRow; }; // Kudu table options List<String> rangeKeys = Collections.singletonList("key"); CreateTableOptions createTableOptions = new CreateTableOptions().setNumReplicas(1).setRangePartitionColumns(rangeKeys); // Alpakka settings KuduTableSettings<Person> tableSettings = KuduTableSettings.create("tablenameSink", schema, createTableOptions, kuduConverter);
The KuduClient
by default is automatically managed by the connector. Settings for the client are read from the reference.conf file. A manually initialized client can be injected to the stream using KuduAttributes
- Scala
-
source
val masterAddress = "localhost:7051" val client = new KuduClient.KuduClientBuilder(masterAddress).build system.registerOnTermination(client.shutdown()) val flow: Flow[Person, Person, NotUsed] = KuduTable .flow(kuduTableSettings.withTableName("Flow")) .withAttributes(KuduAttributes.client(client))
- Java
-
source
final String masterAddress = "localhost:7051"; final KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build(); system.registerOnTermination( () -> { try { client.shutdown(); } catch (KuduException e) { e.printStackTrace(); } }); final Flow<Person, Person, NotUsed> flow = KuduTable.flow(tableSettings.withTableName("Flow")) .withAttributes(KuduAttributes.client(client));
Writing to Kudu in a Flow
- Scala
-
source
val flow: Flow[Person, Person, NotUsed] = KuduTable.flow(kuduTableSettings.withTableName("Flow")) val f = Source(11 to 20) .map(i => Person(i, s"zozo_$i")) .via(flow) .runWith(Sink.fold(0)((a, d) => a + d.id))
- Java
-
source
Flow<Person, Person, NotUsed> flow = KuduTable.flow(tableSettings.withTableName("Flow")); CompletionStage<List<Person>> run = Source.from(Arrays.asList(200, 201, 202, 203, 204)) .map((i) -> new Person(i, String.format("name_%d", i))) .via(flow) .toMat(Sink.seq(), Keep.right()) .run(system);
Writing to Kudu with a Sink
- Scala
-
source
val sink: Sink[Person, Future[Done]] = KuduTable.sink(kuduTableSettings.withTableName("Sink")) val f = Source(1 to 10) .map(i => Person(i, s"zozo_$i")) .runWith(sink)
- Java
-
source
final Sink<Person, CompletionStage<Done>> sink = KuduTable.sink(tableSettings.withTableName("Sink")); CompletionStage<Done> o = Source.from(Arrays.asList(100, 101, 102, 103, 104)) .map((i) -> new Person(i, String.format("name %d", i))) .runWith(sink, system);