Apache Kudu
The Kudu connector has not been updated for too long and is now considered End of Life. It will be removed with the next release of Alpakka.
The Alpakka Kudu connector supports writing to Apache Kudu tables.
Apache Kudu is a free and open source column-oriented data store in the Apache Hadoop ecosystem.
Project Info: Alpakka Kudu | |
Artifact | com.lightbend.akka
JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 |
Scala versions | 2.13.12, 3.3.4 |
JPMS module name | akka.stream.alpakka.kudu |
License | |
Readiness level | End-of-Life, it is not recommended to use this project any more.
Since 9.0.0-M1, 2024-10-14
Home page | https://doc.akka.io/libraries/alpakka/current |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.10.0" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-kudu" % "9.0.1", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
<properties> <akka.version>2.10.0</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-kudu_${scala.binary.version}</artifactId> <version>9.0.1</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ AkkaVersion: "2.10.0", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-kudu_${versions.ScalaBinary}:9.0.1" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-stream_2.13 2.10.0 org.apache.kudu kudu-client-tools 1.7.1 org.scala-lang scala-library 2.13.12 - Dependency tree
com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.apache.kudu kudu-client-tools 1.7.1 org.slf4j slf4j-api 1.7.25 org.scala-lang scala-library 2.13.12 Apache-2.0
To connect to Kudu you need:
- Describe the Kudu
- Define a converter function to map your data type to a
- Specify Kudu
- Set up Alpakka’s
- Scala
// Kudu Schema val cols = List(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build, new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build) val schema = new Schema(cols.asJava) // Converter function case class Person(id: Int, name: String) val kuduConverter: Person => PartialRow = { person => val partialRow = schema.newPartialRow() partialRow.addInt(0, person.id) partialRow.addString(1, person.name) partialRow } // Kudu table options val rangeKeys = List("key") val createTableOptions = new CreateTableOptions().setNumReplicas(1).setRangePartitionColumns(rangeKeys.asJava) // Alpakka settings val kuduTableSettings = KuduTableSettings("test", schema, createTableOptions, kuduConverter)
- Java
// Kudu Schema List<ColumnSchema> columns = new ArrayList<>(2); columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build()); schema = new Schema(columns); // Converter function Function<Person, PartialRow> kuduConverter = person -> { PartialRow partialRow = schema.newPartialRow(); partialRow.addInt(0, person.id); partialRow.addString(1, person.name); return partialRow; }; // Kudu table options List<String> rangeKeys = Collections.singletonList("key"); CreateTableOptions createTableOptions = new CreateTableOptions().setNumReplicas(1).setRangePartitionColumns(rangeKeys); // Alpakka settings KuduTableSettings<Person> tableSettings = KuduTableSettings.create("tablenameSink", schema, createTableOptions, kuduConverter);
The KuduClient
by default is automatically managed by the connector. Settings for the client are read from the reference.conf file. A manually initialized client can be injected to the stream using KuduAttributes
- Scala
val masterAddress = "localhost:7051" val client = new KuduClient.KuduClientBuilder(masterAddress).build system.registerOnTermination(client.shutdown()) val flow: Flow[Person, Person, NotUsed] = KuduTable .flow(kuduTableSettings.withTableName("Flow")) .withAttributes(KuduAttributes.client(client))
- Java
final String masterAddress = "localhost:7051"; final KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build(); system.registerOnTermination( () -> { try { client.shutdown(); } catch (KuduException e) { e.printStackTrace(); } }); final Flow<Person, Person, NotUsed> flow = KuduTable.flow(tableSettings.withTableName("Flow")) .withAttributes(KuduAttributes.client(client));
Writing to Kudu in a Flow
- Scala
val flow: Flow[Person, Person, NotUsed] = KuduTable.flow(kuduTableSettings.withTableName("Flow")) val f = Source(11 to 20) .map(i => Person(i, s"zozo_$i")) .via(flow) .runWith(Sink.fold(0)((a, d) => a + d.id))
- Java
Flow<Person, Person, NotUsed> flow = KuduTable.flow(tableSettings.withTableName("Flow")); CompletionStage<List<Person>> run = Source.from(Arrays.asList(200, 201, 202, 203, 204)) .map((i) -> new Person(i, String.format("name_%d", i))) .via(flow) .toMat(Sink.seq(), Keep.right()) .run(system);
Writing to Kudu with a Sink
- Scala
val sink: Sink[Person, Future[Done]] = KuduTable.sink(kuduTableSettings.withTableName("Sink")) val f = Source(1 to 10) .map(i => Person(i, s"zozo_$i")) .runWith(sink)
- Java
final Sink<Person, CompletionStage<Done>> sink = KuduTable.sink(tableSettings.withTableName("Sink")); CompletionStage<Done> o = Source.from(Arrays.asList(100, 101, 102, 103, 104)) .map((i) -> new Person(i, String.format("name %d", i))) .runWith(sink, system);