Google Cloud BigQuery

The BigQuery connector provides Akka Stream sources and sinks to connect to Google Cloud BigQuery. BigQuery is a serverless data warehouse for storing and analyzing massive datasets. This connector is primarily intended for streaming data into and out of BigQuery tables and running SQL queries, although it also provides basic support for managing datasets and tables and flexible access to the BigQuery REST API.

Project Info: Alpakka Google Cloud BigQuery
Artifact
com.lightbend.akka
akka-stream-alpakka-google-cloud-bigquery
9.0.0
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12
JPMS module nameakka.stream.alpakka.google.cloud.bigquery
License
Readiness level
Since 2.0.2, 2020-07-31
Home pagehttps://doc.akka.io/libraries/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka
API may change

Alpakka Google Cloud BigQuery was added in Alpakka 2.0.2 in July 2020 and is marked as “API may change”. Please try it out and suggest improvements. PR #2548

Artifacts

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.10.0"
val AkkaHttpVersion = "10.7.0"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-bigquery" % "9.0.0",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
  "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion
)
Maven
<properties>
  <akka.version>2.10.0</akka.version>
  <akka.http.version>10.7.0</akka.http.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-google-cloud-bigquery_${scala.binary.version}</artifactId>
    <version>9.0.0</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-http_${scala.binary.version}</artifactId>
    <version>${akka.http.version}</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-http-spray-json_${scala.binary.version}</artifactId>
    <version>${akka.http.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.10.0",
  AkkaHttpVersion: "10.7.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-google-cloud-bigquery_${versions.ScalaBinary}:9.0.0"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
  implementation "com.typesafe.akka:akka-http_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
  implementation "com.typesafe.akka:akka-http-spray-json_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
}

To use the Jackson JSON library for marshalling you must also add the Akka HTTP module for Jackson support.

sbt
val AkkaHttpVersion = "10.7.0"
libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % AkkaHttpVersion
Maven
<properties>
  <akka.http.version>10.7.0</akka.http.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-http-jackson_${scala.binary.version}</artifactId>
    <version>${akka.http.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaHttpVersion: "10.7.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.typesafe.akka:akka-http-jackson_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries that it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.fasterxml.jackson.corejackson-annotations2.17.2
com.fasterxml.jackson.corejackson-core2.17.2
com.fasterxml.jackson.corejackson-databind2.17.2
com.lightbend.akkaakka-stream-alpakka-google-common_2.139.0.0
com.typesafe.akkaakka-http-spray-json_2.1310.7.0
com.typesafe.akkaakka-http_2.1310.7.0
com.typesafe.akkaakka-pki_2.132.10.0
com.typesafe.akkaakka-stream_2.132.10.0
io.sprayspray-json_2.131.3.6
org.scala-langscala-library2.13.12
Dependency tree
com.fasterxml.jackson.core    jackson-annotations    2.17.2    The Apache Software License, Version 2.0
com.fasterxml.jackson.core    jackson-core    2.17.2    The Apache Software License, Version 2.0
com.fasterxml.jackson.core    jackson-databind    2.17.2    The Apache Software License, Version 2.0
    com.fasterxml.jackson.core    jackson-annotations    2.17.2    The Apache Software License, Version 2.0
    com.fasterxml.jackson.core    jackson-core    2.17.2    The Apache Software License, Version 2.0
com.lightbend.akka    akka-stream-alpakka-google-common_2.13    9.0.0
    com.github.jwt-scala    jwt-json-common_2.13    9.4.6    Apache-2.0
        com.github.jwt-scala    jwt-core_2.13    9.4.6    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.google.auth    google-auth-library-credentials    1.24.1
    com.typesafe.akka    akka-http-spray-json_2.13    10.7.0    BUSL-1.1
        com.typesafe.akka    akka-http_2.13    10.7.0    BUSL-1.1
            com.typesafe.akka    akka-http-core_2.13    10.7.0    BUSL-1.1
                com.typesafe.akka    akka-parsing_2.13    10.7.0    BUSL-1.1
                    org.scala-lang    scala-library    2.13.12    Apache-2.0
                org.scala-lang    scala-library    2.13.12    Apache-2.0
            com.typesafe.akka    akka-pki_2.13    2.10.0    BUSL-1.1
                com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
                com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
                    com.typesafe    config    1.4.3    Apache-2.0
                    org.scala-lang    scala-library    2.13.12    Apache-2.0
                org.scala-lang    scala-library    2.13.12    Apache-2.0
                org.slf4j    slf4j-api    2.0.16
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        io.spray    spray-json_2.13    1.3.6    Apache 2
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-http_2.13    10.7.0    BUSL-1.1
        com.typesafe.akka    akka-http-core_2.13    10.7.0    BUSL-1.1
            com.typesafe.akka    akka-parsing_2.13    10.7.0    BUSL-1.1
                org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        com.typesafe.akka    akka-pki_2.13    2.10.0    BUSL-1.1
            com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
            com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
                com.typesafe    config    1.4.3    Apache-2.0
                org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.slf4j    slf4j-api    2.0.16
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
            com.typesafe    config    1.4.3    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
        org.reactivestreams    reactive-streams    1.0.4    MIT-0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
com.typesafe.akka    akka-http-spray-json_2.13    10.7.0    BUSL-1.1
    com.typesafe.akka    akka-http_2.13    10.7.0    BUSL-1.1
        com.typesafe.akka    akka-http-core_2.13    10.7.0    BUSL-1.1
            com.typesafe.akka    akka-parsing_2.13    10.7.0    BUSL-1.1
                org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        com.typesafe.akka    akka-pki_2.13    2.10.0    BUSL-1.1
            com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
            com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
                com.typesafe    config    1.4.3    Apache-2.0
                org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.slf4j    slf4j-api    2.0.16
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    io.spray    spray-json_2.13    1.3.6    Apache 2
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
com.typesafe.akka    akka-http_2.13    10.7.0    BUSL-1.1
    com.typesafe.akka    akka-http-core_2.13    10.7.0    BUSL-1.1
        com.typesafe.akka    akka-parsing_2.13    10.7.0    BUSL-1.1
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-pki_2.13    2.10.0    BUSL-1.1
        com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
        com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
            com.typesafe    config    1.4.3    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.slf4j    slf4j-api    2.0.16
    org.scala-lang    scala-library    2.13.12    Apache-2.0
com.typesafe.akka    akka-pki_2.13    2.10.0    BUSL-1.1
    com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
        com.typesafe    config    1.4.3    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
    org.slf4j    slf4j-api    2.0.16
com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
        com.typesafe    config    1.4.3    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
io.spray    spray-json_2.13    1.3.6    Apache 2
    org.scala-lang    scala-library    2.13.12    Apache-2.0
org.scala-lang    scala-library    2.13.12    Apache-2.0

Configuration

The BigQuery connector shares its basic configuration with all the Google connectors in Alpakka. Additional BigQuery-specific configuration settings can be found in its reference.conf.

Imports

All of the examples below assume the following imports are in scope.

Scala
sourceimport akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.stream.alpakka.google.{GoogleAttributes, GoogleSettings}
import akka.stream.alpakka.googlecloud.bigquery.InsertAllRetryPolicy
import akka.stream.alpakka.googlecloud.bigquery.model.{
  Dataset,
  Job,
  JobReference,
  JobState,
  QueryResponse,
  Table,
  TableDataListResponse,
  TableListResponse
}
import akka.stream.alpakka.googlecloud.bigquery.scaladsl.BigQuery
import akka.stream.alpakka.googlecloud.bigquery.scaladsl.schema.BigQuerySchemas._
import akka.stream.alpakka.googlecloud.bigquery.scaladsl.schema.TableSchemaWriter
import akka.stream.alpakka.googlecloud.bigquery.scaladsl.spray.BigQueryJsonProtocol._
import akka.stream.alpakka.googlecloud.bigquery.scaladsl.spray.BigQueryRootJsonFormat
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.{Done, NotUsed}

import scala.annotation.nowarn
import scala.collection.immutable.Seq
import scala.concurrent.Future
Java
source
import akka.Done; import akka.NotUsed; import akka.http.javadsl.marshallers.jackson.Jackson; import akka.http.javadsl.marshalling.Marshaller; import akka.http.javadsl.model.HttpEntity; import akka.http.javadsl.model.RequestEntity; import akka.http.javadsl.unmarshalling.Unmarshaller; import akka.stream.alpakka.google.GoogleAttributes; import akka.stream.alpakka.google.GoogleSettings; import akka.stream.alpakka.googlecloud.bigquery.InsertAllRetryPolicy; import akka.stream.alpakka.googlecloud.bigquery.javadsl.BigQuery; import akka.stream.alpakka.googlecloud.bigquery.javadsl.jackson.BigQueryMarshallers; import akka.stream.alpakka.googlecloud.bigquery.model.Dataset; import akka.stream.alpakka.googlecloud.bigquery.model.Job; import akka.stream.alpakka.googlecloud.bigquery.model.JobReference; import akka.stream.alpakka.googlecloud.bigquery.model.JobState; import akka.stream.alpakka.googlecloud.bigquery.model.QueryResponse; import akka.stream.alpakka.googlecloud.bigquery.model.Table; import akka.stream.alpakka.googlecloud.bigquery.model.TableDataInsertAllRequest; import akka.stream.alpakka.googlecloud.bigquery.model.TableDataListResponse; import akka.stream.alpakka.googlecloud.bigquery.model.TableFieldSchema; import akka.stream.alpakka.googlecloud.bigquery.model.TableFieldSchemaMode; import akka.stream.alpakka.googlecloud.bigquery.model.TableFieldSchemaType; import akka.stream.alpakka.googlecloud.bigquery.model.TableListResponse; import akka.stream.alpakka.googlecloud.bigquery.model.TableSchema; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.stream.Collectors;

Setup data classes

As a working example throughout this documentation, we will use the Person case class to model the data in our BigQuery tables.

Scala
sourcecase class Person(name: String, age: Int, addresses: Seq[Address], isHakker: Boolean)
case class Address(street: String, city: String, postalCode: Option[Int])
implicit val addressFormat: BigQueryRootJsonFormat[Address] = bigQueryJsonFormat3(Address)
implicit val personFormat: BigQueryRootJsonFormat[Person] = bigQueryJsonFormat4(Person)
Java
sourceObjectMapper objectMapper = new ObjectMapper();

public class Person {
  private String name;
  private Integer age;
  private List<Address> addresses;
  private Boolean isHakker;

  @JsonCreator
  public Person(@JsonProperty("f") JsonNode fields) throws IOException {
    name = fields.get(0).get("v").textValue();
    age = Integer.parseInt(fields.get(1).get("v").textValue());
    addresses = new ArrayList<>();
    ObjectReader addressReader = objectMapper.readerFor(Address.class);
    for (JsonNode node : fields.get(2).get("v")) {
      Address address = addressReader.readValue(node.get("v"));
      addresses.add(address);
    }
    isHakker = Boolean.parseBoolean(fields.get(3).get("v").textValue());
  }

  public String getName() {
    return name;
  }

  public Integer getAge() {
    return age;
  }

  public List<Address> getAddresses() {
    return addresses;
  }

  public Boolean getIsHakker() {
    return isHakker;
  }
}

public class Address {
  private String street;
  private String city;
  private Integer postalCode;

  @JsonCreator
  public Address(@JsonProperty("f") JsonNode fields) {
    street = fields.get(0).get("v").textValue();
    city = fields.get(1).get("v").textValue();
    postalCode =
        Optional.of(fields.get(2).get("v").textValue()).map(Integer::parseInt).orElse(null);
  }

  public String getStreet() {
    return street;
  }

  public String getCity() {
    return city;
  }

  public Integer getPostalCode() {
    return postalCode;
  }
}

public class NameAddressesPair {
  private String name;
  private List<Address> addresses;

  @JsonCreator
  public NameAddressesPair(@JsonProperty("f") JsonNode fields) throws IOException {
    name = fields.get(0).get("v").textValue();
    addresses = new ArrayList<>();
    ObjectReader addressReader = objectMapper.readerFor(Address.class);
    for (JsonNode node : fields.get(1).get("v")) {
      Address address = addressReader.readValue(node.get("v"));
      addresses.add(address);
    }
  }
}

To enable automatic support for (un)marshalling User and Address as BigQuery table rows and query results we create implicit BigQueryRootJsonFormat[T] instances. The bigQueryJsonFormatN methods are imported from BigQueryJsonProtocol, analogous to Spray’s DefaultJsonProtocol. To enable support for (un)marshalling User and Address as BigQuery table rows and query results we use Jackson’s @JsonCreator and @JsonProperty annotations. Note that a custom @JsonCreator constructor is necessary due to BigQuery’s unusual encoding of rows as “a series of JSON f,v objects for indicating fields and values” (reference documentation). In addition, we also define NameAddressesPair to model the result of the query in the next section.

Run a query

You can run a SQL query and stream the unmarshalled results with the BigQuery.query<Out>BigQuery.query[Out] BigQuery.<Out>queryBigQuery.<Out>query method. The output type Out can be a tuple or any user-defined class for which an implicit BigQueryRootJsonFormat[Out] is available. Note that the order and presence of fields in Out must strictly match your SQL query. To create the unmarshaller, use the BigQueryMarshallers.<Out>queryResponseUnmarshaller method.

Scala
sourceval sqlQuery = s"SELECT name, addresses FROM $datasetId.$tableId WHERE age >= 100"
val centenarians: Source[(String, Seq[Address]), Future[QueryResponse[(String, Seq[Address])]]] =
  BigQuery.query[(String, Seq[Address])](sqlQuery, useLegacySql = false)
Java
sourceString sqlQuery =
    String.format("SELECT name, addresses FROM %s.%s WHERE age >= 100", datasetId, tableId);
Unmarshaller<HttpEntity, QueryResponse<NameAddressesPair>> queryResponseUnmarshaller =
    BigQueryMarshallers.queryResponseUnmarshaller(NameAddressesPair.class);
Source<NameAddressesPair, CompletionStage<QueryResponse<NameAddressesPair>>> centenarians =
    BigQuery.query(sqlQuery, false, false, queryResponseUnmarshaller);

Notice that the source materializes a Future[QueryResponse[(String, Seq[Address])]] CompletionStage<QueryResponse<NameAddressesTuple>> which can be used to retrieve metadata related to the query. For example, you can use a dry run to estimate the number of bytes that will be read by a query.

Scala
sourceval centenariansDryRun = BigQuery.query[(String, Seq[Address])](sqlQuery, dryRun = true, useLegacySql = false)
val bytesProcessed: Future[Long] = centenariansDryRun.to(Sink.ignore).run().map(_.totalBytesProcessed.get)
Java
sourceSource<NameAddressesPair, CompletionStage<QueryResponse<NameAddressesPair>>>
    centenariansDryRun = BigQuery.query(sqlQuery, false, false, queryResponseUnmarshaller);
CompletionStage<Long> bytesProcessed =
    centenariansDryRun
        .to(Sink.ignore())
        .run(system)
        .thenApply(r -> r.getTotalBytesProcessed().getAsLong());

Finally, you can also stream all of the rows in a table without the expense of running a query with the BigQuery.tableData<Out>BigQuery.tableData[Out] BigQuery.<Out>listTableDataBigQuery.<Out>listTableData method.

Scala
sourceval everyone: Source[Person, Future[TableDataListResponse[Person]]] =
  BigQuery.tableData[Person](datasetId, tableId)
Java
sourceUnmarshaller<HttpEntity, TableDataListResponse<Person>> tableDataListUnmarshaller =
    BigQueryMarshallers.tableDataListResponseUnmarshaller(Person.class);
Source<Person, CompletionStage<TableDataListResponse<Person>>> everyone =
    BigQuery.listTableData(
        datasetId,
        tableId,
        OptionalLong.empty(),
        OptionalInt.empty(),
        Collections.emptyList(),
        tableDataListUnmarshaller);

Load data into BigQuery

The BigQuery connector enables loading data into tables via real-time streaming inserts or batch loading. For an overview of these strategies see the BigQuery documentation.

The BigQuery.insertAll<In>BigQuery.insertAll[In] BigQuery.<In>insertAllBigQuery.<In>insertAll method creates a sink that accepts batches of Seq[In] List<In> (for example created via the batch operator) and streams them directly into a table. To enable/disable BigQuery’s best-effort deduplication feature use the appropriate InsertAllRetryPolicyInsertAllRetryPolicy.

Scala
sourceval peopleInsertSink: Sink[Seq[Person], NotUsed] =
  BigQuery.insertAll[Person](datasetId, tableId, InsertAllRetryPolicy.WithDeduplication)
Java
sourceMarshaller<TableDataInsertAllRequest<Person>, RequestEntity> tableDataInsertAllMarshaller =
    BigQueryMarshallers.tableDataInsertAllRequestMarshaller();
Sink<List<Person>, NotUsed> peopleInsertSink =
    BigQuery.insertAll(
        datasetId,
        tableId,
        InsertAllRetryPolicy.withDeduplication(),
        Optional.empty(),
        tableDataInsertAllMarshaller);

As a cost-saving alternative to streaming inserts, you can also add data to a table via asynchronous load jobs. The BigQuery.insertAllAsync<In>BigQuery.insertAllAsync[In] BigQuery.<In>insertAllAsyncBigQuery.<In>insertAllAsync method creates a flow that starts a series of batch load jobs. By default, a new load job is created every minute to attempt to emulate near-real-time streaming inserts, although there is no guarantee when the job will actually run. The frequency with which new load jobs are created is controlled by the alpakka.google.bigquery.load-job-per-table-quota configuration setting.

Warning

Pending the resolution of Google BigQuery issue 176002651, the BigQuery.insertAllAsync API may not work as expected.

As a workaround, you can use the config setting akka.http.parsing.conflicting-content-type-header-processing-mode = first with Akka HTTP v10.2.4 or later.

Scala
sourceval peopleLoadFlow: Flow[Person, Job, NotUsed] = BigQuery.insertAllAsync[Person](datasetId, tableId)
Java
sourceFlow<Person, Job, NotUsed> peopleLoadFlow =
    BigQuery.insertAllAsync(datasetId, tableId, Jackson.marshaller());

To check the status of the load jobs use the BigQuery.jobBigQuery.job BigQuery.getJobBigQuery.getJob method.

Scala
sourcedef checkIfJobsDone(jobReferences: Seq[JobReference]): Future[Boolean] = {
  for {
    jobs <- Future.sequence(jobReferences.map(ref => BigQuery.job(ref.jobId.get)))
  } yield jobs.forall(job => job.status.exists(_.state == JobState.Done))
}

val isDone: Future[Boolean] = for {
  jobs <- Source(people).via(peopleLoadFlow).runWith(Sink.seq)
  jobReferences = jobs.flatMap(job => job.jobReference)
  isDone <- checkIfJobsDone(jobReferences)
} yield isDone
Java
sourceFunction<List<JobReference>, CompletionStage<Boolean>> checkIfJobsDone =
    jobReferences -> {
      GoogleSettings settings = GoogleSettings.create(system);
      CompletionStage<Boolean> allAreDone = CompletableFuture.completedFuture(true);
      for (JobReference jobReference : jobReferences) {
        CompletionStage<Job> job =
            BigQuery.getJob(jobReference.getJobId().get(), Optional.empty(), settings, system);
        CompletionStage<Boolean> jobIsDone =
            job.thenApply(
                j ->
                    j.getStatus().map(s -> s.getState().equals(JobState.done())).orElse(false));
        allAreDone = allAreDone.thenCombine(jobIsDone, (a, b) -> a & b);
      }
      return allAreDone;
    };

CompletionStage<List<Job>> jobs =
    Source.from(people).via(peopleLoadFlow).runWith(Sink.<Job>seq(), system);
CompletionStage<List<JobReference>> jobReferences =
    jobs.thenApply(
        js -> js.stream().map(j -> j.getJobReference().get()).collect(Collectors.toList()));
CompletionStage<Boolean> isDone = jobReferences.thenCompose(checkIfJobsDone);

Managing datasets and tables

The BigQuery connector provides methods for basic management of datasets and tables.

Scala
sourceval allDatasets: Source[Dataset, NotUsed] = BigQuery.datasets
val existingDataset: Future[Dataset] = BigQuery.dataset(datasetId)
val newDataset: Future[Dataset] = BigQuery.createDataset("newDatasetId")
val datasetDeleted: Future[Done] = BigQuery.deleteDataset(datasetId)
val allTablesInDataset: Source[Table, Future[TableListResponse]] = BigQuery.tables(datasetId)
val existingTable: Future[Table] = BigQuery.table(datasetId, tableId)
val tableDeleted: Future[Done] = BigQuery.deleteTable(datasetId, tableId)
Java
sourceGoogleSettings settings = GoogleSettings.create(system);
Source<Dataset, NotUsed> allDatasets =
    BigQuery.listDatasets(OptionalInt.empty(), Optional.empty(), Collections.emptyMap());
CompletionStage<Dataset> existingDataset = BigQuery.getDataset(datasetId, settings, system);
CompletionStage<Dataset> newDataset = BigQuery.createDataset("newDatasetId", settings, system);
CompletionStage<Done> datasetDeleted =
    BigQuery.deleteDataset(datasetId, false, settings, system);
Source<Table, CompletionStage<TableListResponse>> allTablesInDataset =
    BigQuery.listTables(datasetId, OptionalInt.empty());
CompletionStage<Table> existingTable = BigQuery.getTable(datasetId, tableId, settings, system);
CompletionStage<Done> tableDeleted = BigQuery.deleteTable(datasetId, tableId, settings, system);

Creating a table requires a little more work to specify the schema. To enable automatic schema generation, you can bring implicit TableSchemaWriter[T] instances for your classes into scope via the bigQuerySchemaN methods in BigQuerySchemas.

Scala
sourceimplicit val addressSchema: TableSchemaWriter[Address] = bigQuerySchema3(Address)
implicit val personSchema: TableSchemaWriter[Person] = bigQuerySchema4(Person)
val newTable: Future[Table] = BigQuery.createTable[Person](datasetId, "newTableId")
Java
sourceTableSchema personSchema =
    TableSchema.create(
        TableFieldSchema.create("name", TableFieldSchemaType.string(), Optional.empty()),
        TableFieldSchema.create("age", TableFieldSchemaType.integer(), Optional.empty()),
        TableFieldSchema.create(
            "addresses",
            TableFieldSchemaType.record(),
            Optional.of(TableFieldSchemaMode.repeated()),
            TableFieldSchema.create("street", TableFieldSchemaType.string(), Optional.empty()),
            TableFieldSchema.create("city", TableFieldSchemaType.string(), Optional.empty()),
            TableFieldSchema.create(
                "postalCode",
                TableFieldSchemaType.integer(),
                Optional.of(TableFieldSchemaMode.nullable()))),
        TableFieldSchema.create("isHakker", TableFieldSchemaType.bool(), Optional.empty()));
CompletionStage<Table> newTable =
    BigQuery.createTable(datasetId, "newTableId", personSchema, settings, system);

Apply custom settings to a part of the stream

In certain situations it may be desirable to modify the GoogleSettingsGoogleSettings applied to a part of the stream, for example to change the project ID or use different RetrySettingsRetrySettings.

Scala
sourceval defaultSettings: GoogleSettings = GoogleSettings()
val customSettings = defaultSettings.copy(projectId = "myOtherProject")
BigQuery.query[(String, Seq[Address])](sqlQuery).withAttributes(GoogleAttributes.settings(customSettings))
Java
sourceGoogleSettings defaultSettings = GoogleSettings.create(system);
GoogleSettings customSettings = defaultSettings.withProjectId("myOtherProjectId");
BigQuery.query(sqlQuery, false, false, queryResponseUnmarshaller)
    .withAttributes(GoogleAttributes.settings(customSettings));

Make raw API requests

If you would like to interact with the BigQuery REST API beyond what the BigQuery connector supports, you can make authenticated raw requests via the BigQuery.singleRequestBigQuery.singleRequest and BigQuery.paginatedRequest<Out>BigQuery.paginatedRequest[Out] BigQuery.<Out>paginatedRequestBigQuery.<Out>paginatedRequest methods.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.