Dataflow How to Read in Big Data Set
Built-in I/O Transforms
Google BigQuery I/O connector
The Beam SDKs include built-in transforms that can read information from and write data to Google BigQuery tables.
Before you start
To utilize BigQueryIO, add the Maven artifact dependency to your pom.xml
file.
< dependency > < groupId > org . apache . axle </ groupId > < artifactId > beam - sdks - java - io - google - cloud - platform </ artifactId > < version > two . 36 . 0 </ version > </ dependency >
Boosted resources:
To utilize BigQueryIO, you must install the Google Deject Platform dependencies by running pip install apache-beam[gcp]
.
Additional resources:
BigQuery basics
Table names
To read or write from a BigQuery table, you must provide a fully-qualified BigQuery table proper noun (for example, bigquery-public-information:github_repos.sample_contents
). A fully-qualified BigQuery table proper name consists of iii parts:
- Project ID: The ID for your Google Deject Projection. The default value comes from your pipeline options object.
- Dataset ID: The BigQuery dataset ID, which is unique within a given Cloud Project.
- Table ID: A BigQuery table ID, which is unique within a given dataset.
A table name can also include a table decorator if you lot are using time-partitioned tables.
To specify a BigQuery table, you lot tin can utilise either the tabular array's fully-qualified name as a string, or use a TableReference TableReference object.
Using a string
To specify a table with a string, use the format [project_id]:[dataset_id].[table_id]
to specify the fully-qualified BigQuery table proper name.
String tableSpec = "clouddataflow-readonly:samples.weather_stations" ;
# project-id:dataset_id.table_id table_spec = 'clouddataflow-readonly:samples.weather_stations'
You tin can besides omit project_id
and use the [dataset_id].[table_id]
format. If yous omit the projection ID, Axle uses the default projection ID from your pipeline options. pipeline options.
String tableSpec = "samples.weather_stations" ;
# dataset_id.table_id table_spec = 'samples.weather_stations'
Using a TableReference
To specify a table with a TableReference
, create a new TableReference
using the three parts of the BigQuery table name.
TableReference tableSpec = new TableReference () . setProjectId ( "clouddataflow-readonly" ) . setDatasetId ( "samples" ) . setTableId ( "weather_stations" );
from apache_beam.io.gcp.internal.clients import bigquery table_spec = bigquery . TableReference ( projectId = 'clouddataflow-readonly' , datasetId = 'samples' , tableId = 'weather_stations' )
The Beam SDK for Coffee besides provides the parseTableSpec
helper method, which constructs a TableReference
object from a String that contains the fully-qualified BigQuery table name. However, the static factory methods for BigQueryIO transforms take the table name as a String and construct a TableReference
object for you.
Table rows
BigQueryIO read and write transforms produce and eat information as a PCollection
of dictionaries, where each chemical element in the PCollection
represents a single row in the table.
Schemas
When writing to BigQuery, you must supply a table schema for the destination table that you want to write to, unless you specify a create disposition of CREATE_NEVER
. Creating a table schema covers schemas in more detail.
Data types
BigQuery supports the post-obit data types: String, BYTES, INTEGER, FLOAT, NUMERIC, BOOLEAN, TIMESTAMP, DATE, Fourth dimension, DATETIME and GEOGRAPHY. All possible values are described at https://deject.google.com/bigquery/docs/reference/standard-sql/information-types. BigQueryIO allows you lot to use all of these data types. The post-obit example shows the correct format for data types used when reading from and writing to BigQuery:
import com.google.api.services.bigquery.model.TableRow ; import java.math.BigDecimal ; import java.nio.charset.StandardCharsets ; import java.time.Instant ; import coffee.time.LocalDate ; import java.time.LocalDateTime ; import java.time.LocalTime ; import java.util.AbstractMap.SimpleEntry ; import java.util.Arrays ; import java.util.Base64 ; import coffee.util.stream.Collectors ; import coffee.util.stream.Stream ; grade BigQueryTableRowCreate { public static TableRow createTableRow () { TableRow row = new TableRow () // To larn more about BigQuery information types: // https://deject.google.com/bigquery/docs/reference/standard-sql/data-types . prepare ( "string_field" , "UTF-8 strings are supported! 🌱🌳🌍" ) . set ( "int64_field" , 432 ) . set ( "float64_field" , 3 . 141592653589793 ) . set ( "numeric_field" , new BigDecimal ( "1234.56" ). toString ()) . set ( "bool_field" , true ) . set ( "bytes_field" , Base64 . getEncoder () . encodeToString ( "UTF-8 byte string 🌱🌳🌍" . getBytes ( StandardCharsets . UTF_8 ))) // To learn more than about date formatting: // https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html . set ( "date_field" , LocalDate . parse ( "2020-03-nineteen" ). toString ()) // ISO_LOCAL_DATE . gear up ( "datetime_field" , LocalDateTime . parse ( "2020-03-19T20:41:25.123" ). toString ()) // ISO_LOCAL_DATE_TIME . prepare ( "time_field" , LocalTime . parse ( "twenty:41:25.123" ). toString ()) // ISO_LOCAL_TIME . gear up ( "timestamp_field" , Instant . parse ( "2020-03-20T03:41:42.123Z" ). toString ()) // ISO_INSTANT // To acquire more near the geography Well-Known Text (WKT) format: // https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry . set up ( "geography_field" , "POINT(thirty x)" ) // An assortment has its manner ready to REPEATED. . set ( "array_field" , Arrays . asList ( 1 , two , 3 , 4 )) // Any grade can be written as a STRUCT as long equally all the fields in the // schema are present and they are encoded correctly as BigQuery types. . set ( "struct_field" , Stream . of ( new SimpleEntry <>( "string_value" , "Text 🌱🌳🌍" ), new SimpleEntry <>( "int64_value" , "42" )) . collect ( Collectors . toMap ( SimpleEntry :: getKey , SimpleEntry :: getValue ))); return row ; } }
bigquery_data = [{ 'string' : 'abc' , 'bytes' : base64 . b64encode ( b ' \xab\xac ' ), 'integer' : 5 , 'float' : 0.5 , 'numeric' : Decimal ( '5' ), 'boolean' : Truthful , 'timestamp' : '2018-12-31 12:44:31.744957 UTC' , 'date' : '2018-12-31' , 'fourth dimension' : '12:44:31' , 'datetime' : '2018-12-31T12:44:31' , 'geography' : 'POINT(30 x)' }]
Equally of Axle 2.7.0, the NUMERIC information type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data blazon works with Well-Known Text (Run into https://en.wikipedia.org/wiki/Well-known_text format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to exist encoded using base64 encoding when writing to BigQuery. When bytes are read from BigQuery they are returned every bit base64-encoded strings.
As of Axle 2.7.0, the NUMERIC information type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data type works with Well-Known Text (See https://en.wikipedia.org/wiki/Well-known_text format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to exist encoded using base64 encoding when writing to BigQuery. When bytes are read from BigQuery they are returned as base64-encoded bytes.
Reading from BigQuery
BigQueryIO allows you to read from a BigQuery table, or to execute a SQL query and read the results. By default, Axle invokes a BigQuery export asking when you apply a BigQueryIO read transform. All the same, the Beam SDK for Java as well supports using the BigQuery Storage API to read directly from BigQuery storage. Encounter Using the BigQuery Storage API for more than information.
Beam'southward use of BigQuery APIs is subject field to BigQuery'southward Quota and Pricing policies.
The Beam SDK for Coffee has ii BigQueryIO read methods. Both of these methods permit you to read from a table, or read fields using a query string.
read(SerializableFunction)
reads Avro-formatted records and uses a specified parsing function to parse them into a PCollection
of custom typed objects. Each element in the PCollection
represents a single row in the table. The example lawmaking for reading with a query string shows how to utilise read(SerializableFunction)
.readTableRows
returns a PCollection
of BigQuery TableRow
objects. Each chemical element in the PCollection
represents a single row in the tabular array. Integer values in the TableRow
objects are encoded every bit strings to lucifer BigQuery'southward exported JSON format. This method is user-friendly, but tin can be 2-3 times slower in performance compared to read(SerializableFunction)
. The example lawmaking for reading from a table shows how to employ readTableRows
.
Note: BigQueryIO.read()
is deprecated every bit of Beam SDK two.ii.0. Instead, employ read(SerializableFunction<SchemaAndRecord, T>)
to parse BigQuery rows from Avro GenericRecord
into your custom type, or use readTableRows()
to parse them into JSON TableRow
objects.
To read from a BigQuery table using the Beam SDK for Python, use a ReadFromBigQuery
transfrom. ReadFromBigQuery
returns a PCollection
of dictionaries, where each element in the PCollection
represents a unmarried row in the table. Integer values in the TableRow
objects are encoded every bit strings to match BigQuery's exported JSON format.
Note: BigQuerySource()
is deprecated every bit of Beam SDK 2.25.0. Before 2.25.0, to read from a BigQuery table using the Beam SDK, you will utilise a Read
transform on a BigQuerySource
. For example, beam.io.Read(beam.io.BigQuerySource(table_spec))
.
Reading from a tabular array
To read an entire BigQuery table, utilize the from
method with a BigQuery table name. This example uses readTableRows
.
To read an entire BigQuery tabular array, use the table
parameter with the BigQuery table name.
The post-obit lawmaking reads an entire tabular array that contains atmospheric condition station data then extracts the max_temperature
cavalcade.
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData ; import org.apache.beam.sdk.Pipeline ; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO ; import org.apache.axle.sdk.transforms.MapElements ; import org.apache.beam.sdk.values.PCollection ; import org.apache.beam.sdk.values.TypeDescriptor ; class BigQueryReadFromTable { public static PCollection < MyData > readFromTable ( Cord projection , Cord dataset , String table , Pipeline pipeline ) { // Cord project = "my-project-id"; // Cord dataset = "my_bigquery_dataset_id"; // Cord table = "my_bigquery_table_id"; // Pipeline pipeline = Pipeline.create(); PCollection < MyData > rows = pipeline . apply ( "Read from BigQuery query" , BigQueryIO . readTableRows (). from ( String . format ( "%s:%s.%s" , project , dataset , tabular array ))) . utilize ( "TableRows to MyData" , MapElements . into ( TypeDescriptor . of ( MyData . course )). via ( MyData :: fromTableRow )); return rows ; } }
max_temperatures = ( pipeline | 'ReadTable' >> beam . io . ReadFromBigQuery ( tabular array = table_spec ) # Each row is a dictionary where the keys are the BigQuery columns | beam . Map ( lambda elem : elem [ 'max_temperature' ]))
Reading with a query string
If you don't want to read an entire tabular array, you can supply a query string with the fromQuery
method.
If y'all don't want to read an unabridged table, you can supply a query string to ReadFromBigQuery
by specifying the query
parameter.
The following code uses a SQL query to only read the max_temperature
column.
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData ; import org.apache.axle.sdk.Pipeline ; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO ; import org.apache.axle.sdk.transforms.MapElements ; import org.apache.beam.sdk.values.PCollection ; import org.apache.axle.sdk.values.TypeDescriptor ; course BigQueryReadFromQuery { public static PCollection < MyData > readFromQuery ( String project , Cord dataset , String table , Pipeline pipeline ) { // String project = "my-project-id"; // String dataset = "my_bigquery_dataset_id"; // Cord table = "my_bigquery_table_id"; // Pipeline pipeline = Pipeline.create(); PCollection < MyData > rows = pipeline . employ ( "Read from BigQuery query" , BigQueryIO . readTableRows () . fromQuery ( String . format ( "SELECT * FROM `%s.%southward.%south`" , project , dataset , table )) . usingStandardSql ()) . utilise ( "TableRows to MyData" , MapElements . into ( TypeDescriptor . of ( MyData . class )). via ( MyData :: fromTableRow )); return rows ; } }
max_temperatures = ( pipeline | 'QueryTable' >> beam . io . ReadFromBigQuery ( query = 'SELECT max_temperature FROM '\ '[clouddataflow-readonly:samples.weather_stations]' ) # Each row is a dictionary where the keys are the BigQuery columns | beam . Map ( lambda elem : elem [ 'max_temperature' ]))
You lot tin can also use BigQuery's standard SQL dialect with a query string, as shown in the following example:
PCollection < Double > maxTemperatures = p . apply ( BigQueryIO . read ( ( SchemaAndRecord elem ) -> ( Double ) elem . getRecord (). get ( "max_temperature" )) . fromQuery ( "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`" ) . usingStandardSql () . withCoder ( DoubleCoder . of ()));
max_temperatures = ( pipeline | 'QueryTableStdSQL' >> axle . io . ReadFromBigQuery ( query = 'SELECT max_temperature FROM '\ '`clouddataflow-readonly.samples.weather_stations`' , use_standard_sql = True ) # Each row is a lexicon where the keys are the BigQuery columns | beam . Map ( lambda elem : elem [ 'max_temperature' ]))
Using the BigQuery Storage API
The BigQuery Storage API allows y'all to directly access tables in BigQuery storage, and supports features such as cavalcade selection and predicate filter push-downward which tin allow more efficient pipeline execution.
The Beam SDK for Coffee supports using the BigQuery Storage API when reading from BigQuery. SDK versions earlier 2.25.0 support the BigQuery Storage API as an experimental feature and use the pre-GA BigQuery Storage API surface. Callers should migrate pipelines which apply the BigQuery Storage API to use SDK version 2.25.0 or after.
The Axle SDK for Python does not support the BigQuery Storage API. See Axle-10917).
Updating your lawmaking
Use the following methods when yous read from a tabular array:
- Required: Specify withMethod(Method.DIRECT_READ) to use the BigQuery Storage API for the read operation.
- Optional: To use features such as column project and column filtering, you must specify withSelectedFields and withRowRestriction respectively.
The following lawmaking snippet reads from a tabular array. This example is from the BigQueryTornadoes example. When the instance's read method choice is set up to DIRECT_READ
, the pipeline uses the BigQuery Storage API and column projection to read public samples of atmospheric condition data from a BigQuery table. You tin view the full source code on GitHub.
import java.util.Arrays ; import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData ; import org.apache.beam.sdk.Pipeline ; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO ; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method ; import org.apache.beam.sdk.transforms.MapElements ; import org.apache.axle.sdk.values.PCollection ; import org.apache.beam.sdk.values.TypeDescriptor ; class BigQueryReadFromTableWithBigQueryStorageAPI { public static PCollection < MyData > readFromTableWithBigQueryStorageAPI ( String projection , String dataset , String table , Pipeline pipeline ) { // String projection = "my-project-id"; // String dataset = "my_bigquery_dataset_id"; // Cord tabular array = "my_bigquery_table_id"; // Pipeline pipeline = Pipeline.create(); PCollection < MyData > rows = pipeline . apply ( "Read from BigQuery table" , BigQueryIO . readTableRows () . from ( String . format ( "%s:%s.%s" , project , dataset , table )) . withMethod ( Method . DIRECT_READ ) . withSelectedFields ( Arrays . asList ( "string_field" , "int64_field" , "float64_field" , "numeric_field" , "bool_field" , "bytes_field" , "date_field" , "datetime_field" , "time_field" , "timestamp_field" , "geography_field" , "array_field" , "struct_field" ))) . apply ( "TableRows to MyData" , MapElements . into ( TypeDescriptor . of ( MyData . form )). via ( MyData :: fromTableRow )); return rows ; } }
# The SDK for Python does not back up the BigQuery Storage API.
The following lawmaking snippet reads with a query string.
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData ; import org.apache.beam.sdk.Pipeline ; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO ; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method ; import org.apache.axle.sdk.transforms.MapElements ; import org.apache.axle.sdk.values.PCollection ; import org.apache.beam.sdk.values.TypeDescriptor ; class BigQueryReadFromQueryWithBigQueryStorageAPI { public static PCollection < MyData > readFromQueryWithBigQueryStorageAPI ( String project , Cord dataset , Cord table , String query , Pipeline pipeline ) { // String projection = "my-project-id"; // String dataset = "my_bigquery_dataset_id"; // String table = "my_bigquery_table_id"; // Pipeline pipeline = Pipeline.create(); /* String query = Cord.format("SELECT\n" + " string_field,\n" + " int64_field,\n" + " float64_field,\n" + " numeric_field,\n" + " bool_field,\north" + " bytes_field,\n" + " date_field,\n" + " datetime_field,\n" + " time_field,\n" + " timestamp_field,\n" + " geography_field,\northward" + " array_field,\northward" + " struct_field\due north" + "FROM\n" + " `%s:%southward.%s`", projection, dataset, table) */ PCollection < MyData > rows = pipeline . utilise ( "Read from BigQuery table" , BigQueryIO . readTableRows () . fromQuery ( query ) . usingStandardSql () . withMethod ( Method . DIRECT_READ )) . use ( "TableRows to MyData" , MapElements . into ( TypeDescriptor . of ( MyData . class )). via ( MyData :: fromTableRow )); return rows ; } }
# The SDK for Python does not support the BigQuery Storage API.
Writing to BigQuery
BigQueryIO allows you to write to BigQuery tables. If you lot are using the Beam SDK for Coffee, yous tin as well write different rows to different tables.
BigQueryIO write transforms use APIs that are subject area to BigQuery's Quota and Pricing policies.
When yous use a write transform, you must provide the following information for the destination table(south):
- The tabular array name.
- The destination table's create disposition. The create disposition specifies whether the destination table must exist or can be created by the write functioning.
- The destination table's write disposition. The write disposition specifies whether the data you write will replace an existing table, append rows to an existing table, or write only to an empty table.
In add-on, if your write performance creates a new BigQuery tabular array, you must also supply a tabular array schema for the destination table.
Create disposition
The create disposition controls whether or not your BigQuery write operation should create a table if the destination table does non exist.
Utilise .withCreateDisposition
to specify the create disposition. Valid enum values are:
Write.CreateDisposition.CREATE_IF_NEEDED
: Specifies that the write operation should create a new table if 1 does not be. If y'all use this value, yous must provide a table schema with the withSchema
method. CREATE_IF_NEEDED
is the default behavior.Write.CreateDisposition.CREATE_NEVER
: Specifies that a table should never be created. If the destination tabular array does non exist, the write operation fails.
Use the create_disposition
parameter to specify the create disposition. Valid enum values are:
BigQueryDisposition.CREATE_IF_NEEDED
: Specifies that the write operation should create a new table if one does not exist. If yous employ this value, you must provide a table schema. CREATE_IF_NEEDED
is the default behavior.BigQueryDisposition.CREATE_NEVER
: Specifies that a tabular array should never be created. If the destination table does not exist, the write operation fails.
If you specify CREATE_IF_NEEDED
as the create disposition and you lot don't supply a table schema, the transform might fail at runtime if the destination table does not exist.
Write disposition
The write disposition controls how your BigQuery write functioning applies to an existing table.
Use .withWriteDisposition
to specify the write disposition. Valid enum values are:
Write.WriteDisposition.WRITE_EMPTY
: Specifies that the write operation should fail at runtime if the destination table is non empty. WRITE_EMPTY
is the default behavior.Write.WriteDisposition.WRITE_TRUNCATE
: Specifies that the write operation should supervene upon an existing table. Any existing rows in the destination table are removed, and the new rows are added to the table.Write.WriteDisposition.WRITE_APPEND
: Specifies that the write functioning should append the rows to the finish of the existing table.
Utilize the write_disposition
parameter to specify the write disposition. Valid enum values are:
BigQueryDisposition.WRITE_EMPTY
: Specifies that the write performance should fail at runtime if the destination table is non empty. WRITE_EMPTY
is the default behavior.BigQueryDisposition.WRITE_TRUNCATE
: Specifies that the write operation should supplant an existing tabular array. Any existing rows in the destination tabular array are removed, and the new rows are added to the table.BigQueryDisposition.WRITE_APPEND
: Specifies that the write operation should append the rows to the end of the existing table.
When you utilize WRITE_EMPTY
, the check for whether or not the destination table is empty can occur before the actual write functioning. This check doesn't guarantee that your pipeline will have sectional access to the tabular array. Two concurrent pipelines that write to the same output table with a write disposition of WRITE_EMPTY
might start successfully, but both pipelines can fail afterward when the write attempts happen.
Creating a table schema
If your BigQuery write functioning creates a new tabular array, you lot must provide schema information. The schema contains information almost each field in the tabular array.
To create a table schema in Java, you can either use a TableSchema
object, or use a string that contains a JSON-serialized TableSchema
object.
To create a table schema in Python, y'all can either employ a TableSchema
object, or utilise a cord that defines a listing of fields. Single string based schemas practise not support nested fields, repeated fields, or specifying a BigQuery mode for fields (the mode will always be set up to NULLABLE
).
Using a TableSchema
To create and use a table schema as a TableSchema
object, follow these steps.
Create a list of Create a Apply the Create a Create and append a Side by side, apply the
TableFieldSchema
objects. Each TableFieldSchema
object represents a field in the table.TableSchema
object and apply the setFields
method to specify your list of fields.withSchema
method to provide your tabular array schema when yous apply a write transform.
TableSchema
object.TableFieldSchema
object for each field in your tabular array.schema
parameter to provide your tabular array schema when you lot apply a write transform. Gear up the parameter'southward value to the TableSchema
object.
The following example code shows how to create a TableSchema
for a tabular array with two fields (source and quote) of type string.
import com.google.api.services.bigquery.model.TableFieldSchema ; import com.google.api.services.bigquery.model.TableSchema ; import coffee.util.Arrays ; class BigQuerySchemaCreate { public static TableSchema createSchema () { // To acquire more than about BigQuery schemas: // https://cloud.google.com/bigquery/docs/schemas TableSchema schema = new TableSchema () . setFields ( Arrays . asList ( new TableFieldSchema () . setName ( "string_field" ) . setType ( "STRING" ) . setMode ( "REQUIRED" ), new TableFieldSchema () . setName ( "int64_field" ) . setType ( "INT64" ) . setMode ( "NULLABLE" ), new TableFieldSchema () . setName ( "float64_field" ) . setType ( "FLOAT64" ), // default mode is "NULLABLE" new TableFieldSchema (). setName ( "numeric_field" ). setType ( "NUMERIC" ), new TableFieldSchema (). setName ( "bool_field" ). setType ( "BOOL" ), new TableFieldSchema (). setName ( "bytes_field" ). setType ( "BYTES" ), new TableFieldSchema (). setName ( "date_field" ). setType ( "Appointment" ), new TableFieldSchema (). setName ( "datetime_field" ). setType ( "DATETIME" ), new TableFieldSchema (). setName ( "time_field" ). setType ( "Fourth dimension" ), new TableFieldSchema (). setName ( "timestamp_field" ). setType ( "TIMESTAMP" ), new TableFieldSchema (). setName ( "geography_field" ). setType ( "GEOGRAPHY" ), new TableFieldSchema () . setName ( "array_field" ) . setType ( "INT64" ) . setMode ( "REPEATED" ) . setDescription ( "Setting the style to REPEATED makes this an ARRAY<INT64>." ), new TableFieldSchema () . setName ( "struct_field" ) . setType ( "STRUCT" ) . setDescription ( "A STRUCT accepts a custom data form, the fields must match the custom class fields." ) . setFields ( Arrays . asList ( new TableFieldSchema (). setName ( "string_value" ). setType ( "STRING" ), new TableFieldSchema (). setName ( "int64_value" ). setType ( "INT64" ))))); return schema ; } }
table_schema = { 'fields' : [{ 'proper name' : 'source' , 'type' : 'STRING' , 'mode' : 'NULLABLE' }, { 'name' : 'quote' , 'blazon' : 'STRING' , 'mode' : 'REQUIRED' }] }
Using a string
To create and employ a table schema as a string that contains JSON-serialized TableSchema
object, follow these steps.
Create a string that contains a JSON-serialized Utilize the
TableSchema
object.withJsonSchema
method to provide your table schema when yous apply a write transform.
To create and use a table schema as a string, follow these steps.
Create a single comma separated string of the form "field1:type1,field2:type2,field3:type3" that defines a listing of fields. The type should specify the field's BigQuery type. Employ the
schema
parameter to provide your table schema when yous apply a write transform. Ready the parameter's value to the string.
The following example shows how to utilise a string to specify the same table schema every bit the previous instance.
String tableSchemaJson = "" + "{" + " \"fields\": [" + " {" + " \"proper noun\": \"source\"," + " \"blazon\": \"Cord\"," + " \"manner\": \"NULLABLE\"" + " }," + " {" + " \"name\": \"quote\"," + " \"type\": \"Cord\"," + " \"style\": \"REQUIRED\"" + " }" + " ]" + "}" ;
# column_name:BIGQUERY_TYPE, ... table_schema = 'source:STRING, quote:String'
Setting the insertion method
BigQueryIO supports ii methods of inserting data into BigQuery: load jobs and streaming inserts. Each insertion method provides dissimilar tradeoffs of cost, quota, and information consistency. See the BigQuery documentation for different data ingestion options (specifically, load jobs and streaming inserts) for more information about these tradeoffs.
BigQueryIO chooses a default insertion method based on the input PCollection
. You can utilize withMethod
to specify the desired insertion method. Run across Write.Method
for the list of the available methods and their restrictions.
BigQueryIO chooses a default insertion method based on the input PCollection
. Y'all can utilize method
to specify the desired insertion method. See WriteToBigQuery
for the list of the available methods and their restrictions.
BigQueryIO uses load jobs in the following situations:
PCollection
.BigQueryIO.write().withMethod(FILE_LOADS)
.
PCollection
.WriteToBigQuery(method='FILE_LOADS')
.
Note: If you apply batch loads in a streaming pipeline:
You must use withTriggeringFrequency
to specify a triggering frequency for initiating load jobs. Be careful about setting the frequency such that your pipeline doesn't exceed the BigQuery load job quota limit.
You lot tin can either use withNumFileShards
to explicitly set the number of file shards written, or use withAutoSharding
to enable dynamic sharding (starting 2.29.0 release) and the number of shards may be determined and changed at runtime. The sharding beliefs depends on the runners.
Yous must employ triggering_frequency
to specify a triggering frequency for initiating load jobs. Be careful nearly setting the frequency such that your pipeline doesn't exceed the BigQuery load chore quota limit.
You lot can set with_auto_sharding=True
to enable dynamic sharding (starting 2.29.0 release). The number of shards may be determined and changed at runtime. The sharding behavior depends on the runners.
BigQueryIO uses streaming inserts in the following situations:
Notation: Streaming inserts by default enables BigQuery best-effort deduplication machinery. You can disable that by setting Streaming inserts applies a default sharding for each tabular array destination. Yous tin use Notation: Streaming inserts by default enables BigQuery all-time-effort deduplication mechanism. Yous tin can disable that by setting Streaming inserts applies a default sharding for each table destination. Yous can set
PCollection
.BigQueryIO.write().withMethod(STREAMING_INSERTS)
.
PCollection
.WriteToBigQuery(method='STREAMING_INSERTS')
.ignoreInsertIds
. The quota limitations are dissimilar when deduplication is enabled vs. disabled.withAutoSharding
(starting 2.28.0 release) to enable dynamic sharding and the number of shards may be determined and changed at runtime. The sharding behavior depends on the runners.ignore_insert_ids=Truthful
. The quota limitations are dissimilar when deduplication is enabled vs. disabled.with_auto_sharding=True
(starting two.29.0 release) to enable dynamic sharding. The number of shards may be adamant and changed at runtime. The sharding beliefs depends on the runners.
Writing to a tabular array
To write to a BigQuery tabular array, apply either a writeTableRows
or write
transform.
To write to a BigQuery table, apply the WriteToBigQuery
transform. WriteToBigQuery
supports both batch mode and streaming mode. You must apply the transform to a PCollection
of dictionaries. In general, y'all'll need to use another transform, such every bit ParDo
, to format your output data into a collection.
The post-obit examples utilize this PCollection
that contains quotes.
The writeTableRows
method writes a PCollection
of BigQuery TableRow
objects to a BigQuery table. Each chemical element in the PCollection
represents a single row in the table. This example uses writeTableRows
to write elements to a PCollection<TableRow>
. The write operation creates a table if needed; if the tabular array already exists, it will exist replaced.
import com.google.api.services.bigquery.model.TableRow ; import com.google.api.services.bigquery.model.TableSchema ; import org.apache.axle.sdk.io.gcp.bigquery.BigQueryIO ; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition ; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition ; import org.apache.beam.sdk.values.PCollection ; class BigQueryWriteToTable { public static void writeToTable ( String projection , String dataset , String table , TableSchema schema , PCollection < TableRow > rows ) { // Cord projection = "my-project-id"; // String dataset = "my_bigquery_dataset_id"; // String tabular array = "my_bigquery_table_id"; // TableSchema schema = new TableSchema().setFields(Arrays.asList(...)); // Pipeline pipeline = Pipeline.create(); // PCollection<TableRow> rows = ... rows . utilize ( "Write to BigQuery" , BigQueryIO . writeTableRows () . to ( Cord . format ( "%s:%s.%s" , project , dataset , table )) . withSchema ( schema ) // For CreateDisposition: // - CREATE_IF_NEEDED (default): creates the table if information technology doesn't exist, a schema is // required // - CREATE_NEVER: raises an error if the table doesn't exist, a schema is not needed . withCreateDisposition ( CreateDisposition . CREATE_IF_NEEDED ) // For WriteDisposition: // - WRITE_EMPTY (default): raises an fault if the tabular array is not empty // - WRITE_APPEND: appends new rows to existing rows // - WRITE_TRUNCATE: deletes the existing rows before writing . withWriteDisposition ( WriteDisposition . WRITE_TRUNCATE )); // pipeline.run().waitUntilFinish(); } }
quotes = pipeline | beam . Create ([ { 'source' : 'Mahatma Gandhi' , 'quote' : 'My life is my message.' }, { 'source' : 'Yoda' , 'quote' : "Do, or do not. At that place is no 'try'." }, ])
The following example code shows how to apply a WriteToBigQuery
transform to write a PCollection
of dictionaries to a BigQuery table. The write performance creates a table if needed; if the tabular array already exists, it will exist replaced.
quotes | axle . io . WriteToBigQuery ( table_spec , schema = table_schema , write_disposition = beam . io . BigQueryDisposition . WRITE_TRUNCATE , create_disposition = axle . io . BigQueryDisposition . CREATE_IF_NEEDED )
The write
transform writes a PCollection
of custom typed objects to a BigQuery table. Utilise .withFormatFunction(SerializableFunction)
to provide a formatting function that converts each input element in the PCollection
into a TableRow
. This instance uses write
to write a PCollection<String>
. The write operation creates a table if needed; if the table already exists, information technology will be replaced.
quotes . utilise ( BigQueryIO .< Quote > write () . to ( tableSpec ) . withSchema ( tableSchema ) . withFormatFunction ( ( Quote elem ) -> new TableRow (). set ( "source" , elem . source ). set ( "quote" , elem . quote )) . withCreateDisposition ( CreateDisposition . CREATE_IF_NEEDED ) . withWriteDisposition ( WriteDisposition . WRITE_TRUNCATE ));
When you use streaming inserts, yous can make up one's mind what to do with failed records. You can either keep retrying, or render the failed records in a separate PCollection
using the WriteResult.getFailedInserts()
method.
Using dynamic destinations
You tin can employ the dynamic destinations feature to write elements in a PCollection
to different BigQuery tables, possibly with different schemas.
The dynamic destinations feature groups your user type by a user-defined destination key, uses the key to compute a destination table and/or schema, and writes each group's elements to the computed destination.
In add-on, you tin can also write your ain types that accept a mapping function to TableRow
, and you tin use side inputs in all DynamicDestinations
methods.
To use dynamic destinations, yous must create a DynamicDestinations
object and implement the post-obit methods:
getDestination
: Returns an object that getTable
and getSchema
can use equally the destination fundamental to compute the destination table and/or schema.getTable
: Returns the table (every bit a TableDestination
object) for the destination key. This method must return a unique table for each unique destination.getSchema
: Returns the table schema (as a TableSchema
object) for the destination key.
Then, use write().to
with your DynamicDestinations
object. This instance uses a PCollection
that contains conditions data and writes the information into a different table for each yr.
/* @DefaultCoder(AvroCoder.class) static class WeatherData { concluding long year; final long month; final long day; final double maxTemp; public WeatherData() { this.year = 0; this.month = 0; this.twenty-four hours = 0; this.maxTemp = 0.0f; } public WeatherData(long year, long calendar month, long solar day, double maxTemp) { this.twelvemonth = twelvemonth; this.month = month; this.day = day; this.maxTemp = maxTemp; } } */ PCollection < WeatherData > weatherData = p . use ( BigQueryIO . read ( ( SchemaAndRecord elem ) -> { GenericRecord record = elem . getRecord (); return new WeatherData ( ( Long ) record . get ( "year" ), ( Long ) tape . get ( "month" ), ( Long ) record . get ( "twenty-four hour period" ), ( Double ) record . get ( "max_temperature" )); }) . fromQuery ( "SELECT year, month, mean solar day, max_temperature " + "FROM [clouddataflow-readonly:samples.weather_stations] " + "WHERE twelvemonth BETWEEN 2007 AND 2009" ) . withCoder ( AvroCoder . of ( WeatherData . class ))); // We will ship the weather information into different tables for every twelvemonth. weatherData . apply ( BigQueryIO .< WeatherData > write () . to ( new DynamicDestinations < WeatherData , Long >() { @Override public Long getDestination ( ValueInSingleWindow < WeatherData > elem ) { return elem . getValue (). yr ; } @Override public TableDestination getTable ( Long destination ) { return new TableDestination ( new TableReference () . setProjectId ( writeProject ) . setDatasetId ( writeDataset ) . setTableId ( writeTable + "_" + destination ), "Table for year " + destination ); } @Override public TableSchema getSchema ( Long destination ) { return new TableSchema () . setFields ( ImmutableList . of ( new TableFieldSchema () . setName ( "year" ) . setType ( "INTEGER" ) . setMode ( "REQUIRED" ), new TableFieldSchema () . setName ( "calendar month" ) . setType ( "INTEGER" ) . setMode ( "REQUIRED" ), new TableFieldSchema () . setName ( "24-hour interval" ) . setType ( "INTEGER" ) . setMode ( "REQUIRED" ), new TableFieldSchema () . setName ( "maxTemp" ) . setType ( "Float" ) . setMode ( "NULLABLE" ))); } }) . withFormatFunction ( ( WeatherData elem ) -> new TableRow () . set ( "year" , elem . year ) . set up ( "calendar month" , elem . calendar month ) . set ( "day" , elem . day ) . ready ( "maxTemp" , elem . maxTemp )) . withCreateDisposition ( CreateDisposition . CREATE_IF_NEEDED ) . withWriteDisposition ( WriteDisposition . WRITE_TRUNCATE ));
fictional_characters_view = axle . pvalue . AsDict ( pipeline | 'CreateCharacters' >> beam . Create ([( 'Yoda' , Truthful ), ( 'Obi Wan Kenobi' , True )])) def table_fn ( element , fictional_characters ): if element in fictional_characters : return 'my_dataset.fictional_quotes' else : return 'my_dataset.real_quotes' quotes | 'WriteWithDynamicDestination' >> beam . io . WriteToBigQuery ( table_fn , schema = table_schema , table_side_inputs = ( fictional_characters_view , ), write_disposition = beam . io . BigQueryDisposition . WRITE_TRUNCATE , create_disposition = beam . io . BigQueryDisposition . CREATE_IF_NEEDED )
Using fourth dimension segmentation
BigQuery time partitioning divides your table into smaller partitions, which is chosen a partitioned tabular array. Partitioned tables make it easier for you to manage and query your data.
To apply BigQuery time division, use one of these two methods:
withTimePartitioning
: This method takes a TimePartitioning
course, and is only usable if y'all are writing to a unmarried tabular array.withJsonTimePartitioning
: This method is the same as withTimePartitioning
, but takes a JSON-serialized String object.
This case generates ane partition per day.
weatherData . apply ( BigQueryIO .< WeatherData > write () . to ( tableSpec + "_partitioning" ) . withSchema ( tableSchema ) . withFormatFunction ( ( WeatherData elem ) -> new TableRow () . gear up ( "year" , elem . year ) . set ( "month" , elem . calendar month ) . fix ( "24-hour interval" , elem . day ) . set ( "maxTemp" , elem . maxTemp )) // NOTE: an existing table without time partitioning set up will not work . withTimePartitioning ( new TimePartitioning (). setType ( "DAY" )) . withCreateDisposition ( CreateDisposition . CREATE_IF_NEEDED ) . withWriteDisposition ( WriteDisposition . WRITE_TRUNCATE ));
quotes | 'WriteWithTimePartitioning' >> beam . io . WriteToBigQuery ( table_spec , schema = table_schema , write_disposition = axle . io . BigQueryDisposition . WRITE_TRUNCATE , create_disposition = beam . io . BigQueryDisposition . CREATE_IF_NEEDED , additional_bq_parameters = { 'timePartitioning' : { 'blazon' : 'HOUR' }})
Limitations
BigQueryIO currently has the post-obit limitations.
-
Yous can't sequence the completion of a BigQuery write with other steps of your pipeline.
-
If you are using the Beam SDK for Python, you might accept import size quota issues if you write a very large dataset. Every bit a workaround, you lot can division the dataset (for example, using Beam's
Partition
transform) and write to multiple BigQuery tables. The Beam SDK for Coffee does not accept this limitation as it partitions your dataset for you.
Boosted examples
Y'all tin can find additional examples that use BigQuery in Beam's examples directories.
Java cookbook examples
These examples are from the Java cookbook examples directory.
-
BigQueryTornadoes reads the public samples of weather information from BigQuery, counts the number of tornadoes that occur in each month, and writes the results to a BigQuery table.
-
CombinePerKeyExamples reads the public Shakespeare data from BigQuery, and for each word in the dataset that exceeds a given length, generates a string containing the list of play names in which that give-and-take appears. The pipeline then writes the results to a BigQuery table.
-
FilterExamples reads public samples of weather data from BigQuery, performs a projection on the data, finds the global mean of the temperature readings, filters on readings for a single given calendar month, and outputs only data (for that month) that has a hateful temp smaller than the derived global mean.
-
JoinExamples reads a sample of the GDELT "world event" from BigQuery and joins the event
action
land code confronting a table that maps country codes to country names. -
MaxPerKeyExamples reads the public samples of conditions information from BigQuery, finds the maximum temperature for each month, and writes the results to a BigQuery table.
-
TriggerExample performs a streaming analysis of traffic data from San Diego freeways. The pipeline looks at the data coming in from a text file and writes the results to a BigQuery tabular array.
Java consummate examples
These examples are from the Java complete examples directory.
-
AutoComplete computes the most popular hash tags for every prefix, which can exist used for automobile-completion. The pipeline can optionally write the results to a BigQuery table.
-
StreamingWordExtract reads lines of text, splits each line into individual words, capitalizes those words, and writes the output to a BigQuery table.
-
TrafficMaxLaneFlow reads traffic sensor data, finds the lane that had the highest recorded flow, and writes the results to a BigQuery table.
-
TrafficRoutes reads traffic sensor data, calculates the average speed for each window and looks for slowdowns in routes, and writes the results to a BigQuery table.
Python cookbook examples
These examples are from the Python cookbook examples directory.
-
BigQuery schema creates a
TableSchema
with nested and repeated fields, generates data with nested and repeated fields, and writes the data to a BigQuery tabular array. -
BigQuery side inputs uses BigQuery sources every bit a side inputs. It illustrates how to insert side-inputs into transforms in three different forms: as a singleton, as a iterator, and as a listing.
-
BigQuery tornadoes reads from a BigQuery table that has the 'month' and 'tornado' fields as part of the table schema, computes the number of tornadoes in each month, and outputs the results to a BigQuery table.
-
BigQuery filters reads weather station information from a BigQuery table, manipulates BigQuery rows in memory, and writes the results to a BigQuery table.
Last updated on 2021/04/28
Have you found everything you were looking for?
Was information technology all useful and clear? Is at that place annihilation that you would similar to modify? Let u.s. know!
Source: https://beam.apache.org/documentation/io/built-in/google-bigquery/
0 Response to "Dataflow How to Read in Big Data Set"
Postar um comentário