Google Cloud BigQuery

The Google Cloud BigQuery connector provides connectivity to Google BigQuery by running queries on large datasets and streaming the results.

Project Info: Alpakka Google Cloud BigQuery
Artifact
com.lightbend.akka
akka-stream-alpakka-google-cloud-bigquery
2.0.2
JDK versions
Adopt OpenJDK 8
Adopt OpenJDK 11
Scala versions2.12.11, 2.13.3
JPMS module nameakka.stream.alpakka.google.cloud.bigquery
License
Readiness level
Since 2.0.2, 2020-07-31
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka
API may change

Alpakka Google Cloud BigQuery was added in Alpakka 2.0.2 in July 2020 and is marked as “API may change”. Please try it out and suggest improvements. Issue #2353

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-bigquery" % "$version$"
Maven
<properties>
  <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-google-cloud-bigquery_${scala.binary.version}</artifactId>
  <version>$version$</version>
</dependency>
Gradle
versions += [
  ScalaBinary: "2.12"
]
dependencies {
  compile group: 'com.lightbend.akka', name: "akka-stream-alpakka-google-cloud-bigquery_${versions.ScalaBinary}", version: '$version$'
}

Usage

Add the imports

Scala
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.googlecloud.bigquery.BigQueryConfig
import akka.stream.alpakka.googlecloud.bigquery.client.BigQueryCommunicationHelper
import akka.stream.alpakka.googlecloud.bigquery.client.TableDataQueryJsonProtocol.Field
import akka.stream.alpakka.googlecloud.bigquery.client.TableListQueryJsonProtocol.QueryTableModel
import akka.stream.alpakka.googlecloud.bigquery.scaladsl.{BigQueryCallbacks, GoogleBigQuerySource}
import akka.stream.scaladsl.{Sink, Source}
import spray.json.DefaultJsonProtocol._
import spray.json.{JsObject, JsonFormat}

import scala.concurrent.Future
import scala.util.Try
Java
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.scaladsl.model.HttpRequest;
import akka.stream.ActorMaterializer;
import akka.stream.alpakka.googlecloud.bigquery.BigQueryConfig;
import akka.stream.alpakka.googlecloud.bigquery.client.BigQueryCommunicationHelper;
import akka.stream.alpakka.googlecloud.bigquery.client.TableDataQueryJsonProtocol;
import akka.stream.alpakka.googlecloud.bigquery.client.TableListQueryJsonProtocol;
import akka.stream.alpakka.googlecloud.bigquery.javadsl.GoogleBigQuerySource;
import akka.stream.alpakka.googlecloud.bigquery.javadsl.BigQueryCallbacks;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import scala.util.Try;
import spray.json.JsObject;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

Create the BigQuery configuration

Scala
val config = BigQueryConfig("[email protected]", "privateKeyFromGoogle", "projectID", "bigQueryDatasetName")
Java
BigQueryConfig config =
    BigQueryConfig.create(
        "[email protected]",
        "privateKeyFromGoogle",
        "projectID",
        "bigQueryDatasetName",
        system);

You can use the connector in order to list information on the tables and their fields. The payload of the response from these requests is mapped to the models QueryTableModel and Field. The results are mapped partially from the payload received. In order to retrieve the full payload from these requests a custom parser has to be implemented. In case of error, empty response or API changes a custom parser has to be implemented.

Scala
val tables: Future[Seq[QueryTableModel]] = GoogleBigQuerySource.listTables(config).runWith(Sink.seq).map(_.flatten)
val fields: Future[Seq[Field]] = GoogleBigQuerySource.listFields("myTable", config).runWith(Sink.seq).map(_.flatten)
Java
CompletionStage<List<TableListQueryJsonProtocol.QueryTableModel>> tables =
    GoogleBigQuerySource.listTables(config)
        .runWith(Sink.seq(), materializer)
        .thenApply(lt -> lt.stream().flatMap(Collection::stream).collect(Collectors.toList()));

CompletionStage<List<TableDataQueryJsonProtocol.Field>> fields =
    GoogleBigQuerySource.listFields("myTable", config)
        .runWith(Sink.seq(), materializer)
        .thenApply(lt -> lt.stream().flatMap(Collection::stream).collect(Collectors.toList()));
;

For the rawest representation there is a “csvStyle” source built-in. This will return a header (field names), and the fields as a list of Strings.

Scala
val userCsvLikeStream: Source[Seq[String], NotUsed] =
  GoogleBigQuerySource.runQueryCsvStyle("SELECT uid, name FROM bigQueryDatasetName.myTable",
                                        BigQueryCallbacks.tryToStopJob(config),
                                        config)
Java
Source<List<String>, NotUsed> userCsvLikeStream =
    GoogleBigQuerySource.runQueryCsvStyle(
        "SELECT uid, name FROM bigQueryDatasetName.myTable",
        BigQueryCallbacks.tryToStopJob(config, system, materializer),
        config);

There is a more sophisticated way to get data from a database. If you want to get a stream of classes, you can add your converter function too.

Scala
case class User(uid: String, name: String)
implicit val userFormatter = jsonFormat2(User)

def parserFn(result: JsObject): Try[User] = Try(result.convertTo[User])
val userStream: Source[User, NotUsed] =
  GoogleBigQuerySource.runQuery("SELECT uid, name FROM bigQueryDatasetName.myTable",
                                parserFn,
                                BigQueryCallbacks.ignore,
                                config)
Java
static class User {
  String uid;
  String name;

  User(String uid, String name) {
    this.uid = uid;
    this.name = name;
  }
}

static Try<User> userFromJson(JsObject object) {
  return Try.apply(
      () ->
          new User(
              object.fields().apply("uid").toString(), object.fields().apply("name").toString()));
}

private static Source<User, NotUsed> example2() {
  ActorSystem system = ActorSystem.create();
  ActorMaterializer materializer = ActorMaterializer.create(system);
  BigQueryConfig config =
      BigQueryConfig.create(
          "[email protected]",
          "privateKeyFromGoogle",
          "projectID",
          "bigQueryDatasetName",
          system);
  return GoogleBigQuerySource.runQuery(
      "SELECT uid, name FROM bigQueryDatasetName.myTable",
      GoogleBigQuerySourceDoc::userFromJson,
      BigQueryCallbacks.ignore(),
      config);
}

If you want to use the built in paging implementation, or you have some specific needs you can call the raw api. The next example shows how you can access dryRun data with the raw api and helpers.

Scala
case class DryRunResponse(totalBytesProcessed: String, jobComplete: Boolean, cacheHit: Boolean)
implicit val dryRunFormat: JsonFormat[DryRunResponse] = jsonFormat3(DryRunResponse)

def dryRunParser(result: JsObject): Try[DryRunResponse] = Try(result.convertTo[DryRunResponse])

val request = BigQueryCommunicationHelper.createQueryRequest("SELECT uid, name FROM bigQueryDatasetName.myTable",
                                                             config.projectId,
                                                             dryRun = true)

val dryRunStream = GoogleBigQuerySource.raw(request, dryRunParser, BigQueryCallbacks.ignore, config)
Java
static class DryRunResponse {
  String totalBytesProcessed;
  String jobComplete;
  String cacheHit;

  DryRunResponse(String totalBytesProcessed, String jobComplete, String cacheHit) {
    this.totalBytesProcessed = totalBytesProcessed;
    this.jobComplete = jobComplete;
    this.cacheHit = cacheHit;
  }
}

static Try<DryRunResponse> dryRunResponseFromJson(JsObject object) {
  scala.Function0<DryRunResponse> responseFunction =
      () ->
          new DryRunResponse(
              object.fields().apply("totalBytesProcessed").toString(),
              object.fields().apply("jobComplete").toString(),
              object.fields().apply("cacheHit").toString());
  return Try.apply(responseFunction);
}

private static Source<DryRunResponse, NotUsed> example3() {
  ActorSystem system = ActorSystem.create();
  ActorMaterializer materializer = ActorMaterializer.create(system);
  BigQueryConfig config =
      BigQueryConfig.create(
          "[email protected]",
          "privateKeyFromGoogle",
          "projectID",
          "bigQueryDatasetName",
          system);

  HttpRequest request =
      BigQueryCommunicationHelper.createQueryRequest(
          "SELECT uid, name FROM bigQueryDatasetName.myTable", config.projectId(), true);

  return GoogleBigQuerySource.raw(
      request,
      GoogleBigQuerySourceDoc::dryRunResponseFromJson,
      BigQueryCallbacks.ignore(),
      config);
}

Config

The configuration will contain the session (which includes your service-token).

If you create multiple requests to the same source (likely to happen) you should create a single BigQueryConfig instance and reuse it.

If you call multiple bigquery sources (not likely to happen) it is worth to cache the configs, so you can save a lot of unneeded authorization requests.

Cancel on timeout

All of the provided functionality can fire a callback when the downstream signals a stop. This is useful if you want to implement some timeout in the downstream, and try to lower your costs with stopping the longrunning jobs. (Google doesn’t provide any insurance about cost reduction, but at least we could try. Read this for more information.)

You can use the built-in BigQueryCallbacksBigQueryCallbacks.

Parsers

The parser function is a spray.json.JsObject => Try[T]java.util.function.Function[spray.json.JsObject, scala.util.Try[T]] function. This is needed because there is a possibility, the response not to contain any data. In this case we need to retry the request with some delay. Your parser function needs to be bulletproof, and the code in the examples represents the happy path. In case of scala.util.Failure your stream will be polling forever!

Running an End to End test case

You might want to run an End-to-End test case.

See BigQueryEndToEndSpec. To run this example using an actual GCP project you will need to configure a project, create/init tables in google-bigquery and provide a service account.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.