Google Cloud BigQuery
The Google Cloud BigQuery connector provides connectivity to Google BigQuery by running queries on large datasets and streaming the results.
Project Info: Alpakka Google Cloud BigQuery | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-google-cloud-bigquery
2.0.2
|
JDK versions | Adopt OpenJDK 8 Adopt OpenJDK 11 |
Scala versions | 2.12.11, 2.13.3 |
JPMS module name | akka.stream.alpakka.google.cloud.bigquery |
License | |
Readiness level |
Since 2.0.2, 2020-07-31
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Alpakka Google Cloud BigQuery was added in Alpakka 2.0.2 in July 2020 and is marked as “API may change”. Please try it out and suggest improvements. Issue #2353
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-bigquery" % "$version$"
- Maven
<properties> <scala.binary.version>2.12</scala.binary.version> </properties> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-google-cloud-bigquery_${scala.binary.version}</artifactId> <version>$version$</version> </dependency>
- Gradle
versions += [ ScalaBinary: "2.12" ] dependencies { compile group: 'com.lightbend.akka', name: "akka-stream-alpakka-google-cloud-bigquery_${versions.ScalaBinary}", version: '$version$' }
Usage
Add the imports
- Scala
-
import akka.NotUsed import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.alpakka.googlecloud.bigquery.BigQueryConfig import akka.stream.alpakka.googlecloud.bigquery.client.BigQueryCommunicationHelper import akka.stream.alpakka.googlecloud.bigquery.client.TableDataQueryJsonProtocol.Field import akka.stream.alpakka.googlecloud.bigquery.client.TableListQueryJsonProtocol.QueryTableModel import akka.stream.alpakka.googlecloud.bigquery.scaladsl.{BigQueryCallbacks, GoogleBigQuerySource} import akka.stream.scaladsl.{Sink, Source} import spray.json.DefaultJsonProtocol._ import spray.json.{JsObject, JsonFormat} import scala.concurrent.Future import scala.util.Try
- Java
-
import akka.NotUsed; import akka.actor.ActorSystem; import akka.http.scaladsl.model.HttpRequest; import akka.stream.ActorMaterializer; import akka.stream.alpakka.googlecloud.bigquery.BigQueryConfig; import akka.stream.alpakka.googlecloud.bigquery.client.BigQueryCommunicationHelper; import akka.stream.alpakka.googlecloud.bigquery.client.TableDataQueryJsonProtocol; import akka.stream.alpakka.googlecloud.bigquery.client.TableListQueryJsonProtocol; import akka.stream.alpakka.googlecloud.bigquery.javadsl.GoogleBigQuerySource; import akka.stream.alpakka.googlecloud.bigquery.javadsl.BigQueryCallbacks; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import scala.util.Try; import spray.json.JsObject; import java.util.Collection; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors;
Create the BigQuery configuration
- Scala
-
val config = BigQueryConfig("[email protected]", "privateKeyFromGoogle", "projectID", "bigQueryDatasetName")
- Java
-
BigQueryConfig config = BigQueryConfig.create( "[email protected]", "privateKeyFromGoogle", "projectID", "bigQueryDatasetName", system);
You can use the connector in order to list information on the tables and their fields. The payload of the response from these requests is mapped to the models QueryTableModel
and Field
. The results are mapped partially from the payload received. In order to retrieve the full payload from these requests a custom parser has to be implemented. In case of error, empty response or API changes a custom parser has to be implemented.
- Scala
-
val tables: Future[Seq[QueryTableModel]] = GoogleBigQuerySource.listTables(config).runWith(Sink.seq).map(_.flatten) val fields: Future[Seq[Field]] = GoogleBigQuerySource.listFields("myTable", config).runWith(Sink.seq).map(_.flatten)
- Java
-
CompletionStage<List<TableListQueryJsonProtocol.QueryTableModel>> tables = GoogleBigQuerySource.listTables(config) .runWith(Sink.seq(), materializer) .thenApply(lt -> lt.stream().flatMap(Collection::stream).collect(Collectors.toList())); CompletionStage<List<TableDataQueryJsonProtocol.Field>> fields = GoogleBigQuerySource.listFields("myTable", config) .runWith(Sink.seq(), materializer) .thenApply(lt -> lt.stream().flatMap(Collection::stream).collect(Collectors.toList())); ;
For the rawest representation there is a “csvStyle” source built-in. This will return a header (field names), and the fields as a list of Strings.
- Scala
-
val userCsvLikeStream: Source[Seq[String], NotUsed] = GoogleBigQuerySource.runQueryCsvStyle("SELECT uid, name FROM bigQueryDatasetName.myTable", BigQueryCallbacks.tryToStopJob(config), config)
- Java
-
Source<List<String>, NotUsed> userCsvLikeStream = GoogleBigQuerySource.runQueryCsvStyle( "SELECT uid, name FROM bigQueryDatasetName.myTable", BigQueryCallbacks.tryToStopJob(config, system, materializer), config);
There is a more sophisticated way to get data from a database. If you want to get a stream of classes, you can add your converter function too.
- Scala
-
case class User(uid: String, name: String) implicit val userFormatter = jsonFormat2(User) def parserFn(result: JsObject): Try[User] = Try(result.convertTo[User]) val userStream: Source[User, NotUsed] = GoogleBigQuerySource.runQuery("SELECT uid, name FROM bigQueryDatasetName.myTable", parserFn, BigQueryCallbacks.ignore, config)
- Java
-
static class User { String uid; String name; User(String uid, String name) { this.uid = uid; this.name = name; } } static Try<User> userFromJson(JsObject object) { return Try.apply( () -> new User( object.fields().apply("uid").toString(), object.fields().apply("name").toString())); } private static Source<User, NotUsed> example2() { ActorSystem system = ActorSystem.create(); ActorMaterializer materializer = ActorMaterializer.create(system); BigQueryConfig config = BigQueryConfig.create( "[email protected]", "privateKeyFromGoogle", "projectID", "bigQueryDatasetName", system); return GoogleBigQuerySource.runQuery( "SELECT uid, name FROM bigQueryDatasetName.myTable", GoogleBigQuerySourceDoc::userFromJson, BigQueryCallbacks.ignore(), config); }
If you want to use the built in paging implementation, or you have some specific needs you can call the raw api. The next example shows how you can access dryRun data with the raw api and helpers.
- Scala
-
case class DryRunResponse(totalBytesProcessed: String, jobComplete: Boolean, cacheHit: Boolean) implicit val dryRunFormat: JsonFormat[DryRunResponse] = jsonFormat3(DryRunResponse) def dryRunParser(result: JsObject): Try[DryRunResponse] = Try(result.convertTo[DryRunResponse]) val request = BigQueryCommunicationHelper.createQueryRequest("SELECT uid, name FROM bigQueryDatasetName.myTable", config.projectId, dryRun = true) val dryRunStream = GoogleBigQuerySource.raw(request, dryRunParser, BigQueryCallbacks.ignore, config)
- Java
-
static class DryRunResponse { String totalBytesProcessed; String jobComplete; String cacheHit; DryRunResponse(String totalBytesProcessed, String jobComplete, String cacheHit) { this.totalBytesProcessed = totalBytesProcessed; this.jobComplete = jobComplete; this.cacheHit = cacheHit; } } static Try<DryRunResponse> dryRunResponseFromJson(JsObject object) { scala.Function0<DryRunResponse> responseFunction = () -> new DryRunResponse( object.fields().apply("totalBytesProcessed").toString(), object.fields().apply("jobComplete").toString(), object.fields().apply("cacheHit").toString()); return Try.apply(responseFunction); } private static Source<DryRunResponse, NotUsed> example3() { ActorSystem system = ActorSystem.create(); ActorMaterializer materializer = ActorMaterializer.create(system); BigQueryConfig config = BigQueryConfig.create( "[email protected]", "privateKeyFromGoogle", "projectID", "bigQueryDatasetName", system); HttpRequest request = BigQueryCommunicationHelper.createQueryRequest( "SELECT uid, name FROM bigQueryDatasetName.myTable", config.projectId(), true); return GoogleBigQuerySource.raw( request, GoogleBigQuerySourceDoc::dryRunResponseFromJson, BigQueryCallbacks.ignore(), config); }
Config
The configuration will contain the session (which includes your service-token).
If you create multiple requests to the same source (likely to happen) you should create a single BigQueryConfig
instance and reuse it.
If you call multiple bigquery sources (not likely to happen) it is worth to cache the configs, so you can save a lot of unneeded authorization requests.
Cancel on timeout
All of the provided functionality can fire a callback when the downstream signals a stop. This is useful if you want to implement some timeout in the downstream, and try to lower your costs with stopping the longrunning jobs. (Google doesn’t provide any insurance about cost reduction, but at least we could try. Read this for more information.)
You can use the built-in BigQueryCallbacks
BigQueryCallbacks
.
Parsers
The parser function is a spray.json.JsObject => Try[T]
java.util.function.Function[spray.json.JsObject, scala.util.Try[T]]
function. This is needed because there is a possibility, the response not to contain any data. In this case we need to retry the request with some delay. Your parser function needs to be bulletproof, and the code in the examples represents the happy path. In case of scala.util.Failure
your stream will be polling forever!
Running an End to End test case
You might want to run an End-to-End test case.
See BigQueryEndToEndSpec. To run this example using an actual GCP project you will need to configure a project, create/init tables in google-bigquery and provide a service account.