Apache Kudu

End of life

The Kudu connector has not been updated for too long and is now considered End of Life. It will be removed with the next release of Alpakka.

The Alpakka Kudu connector supports writing to Apache Kudu tables.

Apache Kudu is a free and open source column-oriented data store in the Apache Hadoop ecosystem.

Project Info: Alpakka Kudu
Artifact
com.lightbend.akka
akka-stream-alpakka-kudu
9.0.0-M1+1-4049dca2-SNAPSHOT
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12, 3.3.3
JPMS module nameakka.stream.alpakka.kudu
License
Readiness level
End-of-Life, it is not recommended to use this project any more.
Since 9.0.0-M1, 2024-10-14
Home pagehttps://doc.akka.io/libraries/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.10.0-M1"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-kudu" % "9.0.0-M1+1-4049dca2-SNAPSHOT",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.10.0-M1</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-kudu_${scala.binary.version}</artifactId>
    <version>9.0.0-M1+1-4049dca2-SNAPSHOT</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.10.0-M1",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-kudu_${versions.ScalaBinary}:9.0.0-M1+1-4049dca2-SNAPSHOT"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.typesafe.akkaakka-stream_2.132.10.0-M1
org.apache.kudukudu-client-tools1.7.1
org.scala-langscala-library2.13.12
Dependency tree
com.typesafe.akka    akka-stream_2.13    2.10.0-M1    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.10.0-M1    BUSL-1.1
        com.typesafe    config    1.4.3    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0-M1    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
org.apache.kudu    kudu-client-tools    1.7.1
    org.slf4j    slf4j-api    1.7.25
org.scala-lang    scala-library    2.13.12    Apache-2.0

Configuration

To connect to Kudu you need:

  1. Describe the Kudu Schema
  2. Define a converter function to map your data type to a PartialRow
  3. Specify Kudu CreateTableOptions
  4. Set up Alpakka’s KuduTableSettings
Scala
source// Kudu Schema
val cols = List(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build,
                new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build)
val schema = new Schema(cols.asJava)

// Converter function
case class Person(id: Int, name: String)
val kuduConverter: Person => PartialRow = { person =>
  val partialRow = schema.newPartialRow()
  partialRow.addInt(0, person.id)
  partialRow.addString(1, person.name)
  partialRow
}

// Kudu table options
val rangeKeys = List("key")
val createTableOptions = new CreateTableOptions().setNumReplicas(1).setRangePartitionColumns(rangeKeys.asJava)

// Alpakka settings
val kuduTableSettings = KuduTableSettings("test", schema, createTableOptions, kuduConverter)
Java
source// Kudu Schema
List<ColumnSchema> columns = new ArrayList<>(2);
columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build());
schema = new Schema(columns);

// Converter function
Function<Person, PartialRow> kuduConverter =
    person -> {
      PartialRow partialRow = schema.newPartialRow();
      partialRow.addInt(0, person.id);
      partialRow.addString(1, person.name);
      return partialRow;
    };

// Kudu table options
List<String> rangeKeys = Collections.singletonList("key");
CreateTableOptions createTableOptions =
    new CreateTableOptions().setNumReplicas(1).setRangePartitionColumns(rangeKeys);

// Alpakka settings
KuduTableSettings<Person> tableSettings =
    KuduTableSettings.create("tablenameSink", schema, createTableOptions, kuduConverter);

The KuduClient by default is automatically managed by the connector. Settings for the client are read from the reference.conf file. A manually initialized client can be injected to the stream using KuduAttributes

Scala
sourceval masterAddress = "localhost:7051"
val client = new KuduClient.KuduClientBuilder(masterAddress).build
system.registerOnTermination(client.shutdown())

val flow: Flow[Person, Person, NotUsed] =
  KuduTable
    .flow(kuduTableSettings.withTableName("Flow"))
    .withAttributes(KuduAttributes.client(client))
Java
sourcefinal String masterAddress = "localhost:7051";
final KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
system.registerOnTermination(
    () -> {
      try {
        client.shutdown();
      } catch (KuduException e) {
        e.printStackTrace();
      }
    });

final Flow<Person, Person, NotUsed> flow =
    KuduTable.flow(tableSettings.withTableName("Flow"))
        .withAttributes(KuduAttributes.client(client));

Writing to Kudu in a Flow

Scala
sourceval flow: Flow[Person, Person, NotUsed] =
  KuduTable.flow(kuduTableSettings.withTableName("Flow"))

val f = Source(11 to 20)
  .map(i => Person(i, s"zozo_$i"))
  .via(flow)
  .runWith(Sink.fold(0)((a, d) => a + d.id))
Java
sourceFlow<Person, Person, NotUsed> flow = KuduTable.flow(tableSettings.withTableName("Flow"));

CompletionStage<List<Person>> run =
    Source.from(Arrays.asList(200, 201, 202, 203, 204))
        .map((i) -> new Person(i, String.format("name_%d", i)))
        .via(flow)
        .toMat(Sink.seq(), Keep.right())
        .run(system);

Writing to Kudu with a Sink

Scala
sourceval sink: Sink[Person, Future[Done]] =
  KuduTable.sink(kuduTableSettings.withTableName("Sink"))

val f = Source(1 to 10)
  .map(i => Person(i, s"zozo_$i"))
  .runWith(sink)
Java
sourcefinal Sink<Person, CompletionStage<Done>> sink =
    KuduTable.sink(tableSettings.withTableName("Sink"));

CompletionStage<Done> o =
    Source.from(Arrays.asList(100, 101, 102, 103, 104))
        .map((i) -> new Person(i, String.format("name %d", i)))
        .runWith(sink, system);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.