Apache Kudu

The Alpakka Kudu connector supports writing to Apache Kudu tables.

Apache Kudu is a free and open source column-oriented data store in the Apache Hadoop ecosystem.

Project Info: Alpakka Kudu
Artifact
com.lightbend.akka
akka-stream-alpakka-kudu
7.0.2
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12, 3.3.1
JPMS module nameakka.stream.alpakka.kudu
License
Readiness level
Since 0.19, 2018-05-09
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.9.0"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-kudu" % "7.0.2",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.9.0</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-kudu_${scala.binary.version}</artifactId>
    <version>7.0.2</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.9.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-kudu_${versions.ScalaBinary}:7.0.2"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.typesafe.akkaakka-stream_2.132.9.0
org.apache.kudukudu-client-tools1.7.1
org.scala-langscala-library2.13.12
Dependency tree
com.typesafe.akka    akka-stream_2.13    2.9.0    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.9.0    BUSL-1.1
        com.typesafe    config    1.4.3    Apache-2.0
        org.scala-lang.modules    scala-java8-compat_2.13    1.0.2    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.9.0    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
org.apache.kudu    kudu-client-tools    1.7.1
    org.slf4j    slf4j-api    1.7.25
org.scala-lang    scala-library    2.13.12    Apache-2.0

Configuration

To connect to Kudu you need:

  1. Describe the Kudu Schema
  2. Define a converter function to map your data type to a PartialRow
  3. Specify Kudu CreateTableOptions
  4. Set up Alpakka’s KuduTableSettings
Scala
source// Kudu Schema
val cols = List(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build,
                new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build)
val schema = new Schema(cols.asJava)

// Converter function
case class Person(id: Int, name: String)
val kuduConverter: Person => PartialRow = { person =>
  val partialRow = schema.newPartialRow()
  partialRow.addInt(0, person.id)
  partialRow.addString(1, person.name)
  partialRow
}

// Kudu table options
val rangeKeys = List("key")
val createTableOptions = new CreateTableOptions().setNumReplicas(1).setRangePartitionColumns(rangeKeys.asJava)

// Alpakka settings
val kuduTableSettings = KuduTableSettings("test", schema, createTableOptions, kuduConverter)
Java
source// Kudu Schema
List<ColumnSchema> columns = new ArrayList<>(2);
columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build());
schema = new Schema(columns);

// Converter function
Function<Person, PartialRow> kuduConverter =
    person -> {
      PartialRow partialRow = schema.newPartialRow();
      partialRow.addInt(0, person.id);
      partialRow.addString(1, person.name);
      return partialRow;
    };

// Kudu table options
List<String> rangeKeys = Collections.singletonList("key");
CreateTableOptions createTableOptions =
    new CreateTableOptions().setNumReplicas(1).setRangePartitionColumns(rangeKeys);

// Alpakka settings
KuduTableSettings<Person> tableSettings =
    KuduTableSettings.create("tablenameSink", schema, createTableOptions, kuduConverter);

The KuduClient by default is automatically managed by the connector. Settings for the client are read from the reference.conf file. A manually initialized client can be injected to the stream using KuduAttributes

Scala
sourceval masterAddress = "localhost:7051"
val client = new KuduClient.KuduClientBuilder(masterAddress).build
system.registerOnTermination(client.shutdown())

val flow: Flow[Person, Person, NotUsed] =
  KuduTable
    .flow(kuduTableSettings.withTableName("Flow"))
    .withAttributes(KuduAttributes.client(client))
Java
sourcefinal String masterAddress = "localhost:7051";
final KuduClient client = new KuduClient.KuduClientBuilder(masterAddress).build();
system.registerOnTermination(
    () -> {
      try {
        client.shutdown();
      } catch (KuduException e) {
        e.printStackTrace();
      }
    });

final Flow<Person, Person, NotUsed> flow =
    KuduTable.flow(tableSettings.withTableName("Flow"))
        .withAttributes(KuduAttributes.client(client));

Writing to Kudu in a Flow

Scala
sourceval flow: Flow[Person, Person, NotUsed] =
  KuduTable.flow(kuduTableSettings.withTableName("Flow"))

val f = Source(11 to 20)
  .map(i => Person(i, s"zozo_$i"))
  .via(flow)
  .runWith(Sink.fold(0)((a, d) => a + d.id))
Java
sourceFlow<Person, Person, NotUsed> flow = KuduTable.flow(tableSettings.withTableName("Flow"));

CompletionStage<List<Person>> run =
    Source.from(Arrays.asList(200, 201, 202, 203, 204))
        .map((i) -> new Person(i, String.format("name_%d", i)))
        .via(flow)
        .toMat(Sink.seq(), Keep.right())
        .run(system);

Writing to Kudu with a Sink

Scala
sourceval sink: Sink[Person, Future[Done]] =
  KuduTable.sink(kuduTableSettings.withTableName("Sink"))

val f = Source(1 to 10)
  .map(i => Person(i, s"zozo_$i"))
  .runWith(sink)
Java
sourcefinal Sink<Person, CompletionStage<Done>> sink =
    KuduTable.sink(tableSettings.withTableName("Sink"));

CompletionStage<Done> o =
    Source.from(Arrays.asList(100, 101, 102, 103, 104))
        .map((i) -> new Person(i, String.format("name %d", i)))
        .runWith(sink, system);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.