Dataflow How to Read in Big Data Set

Built-in I/O Transforms

Google BigQuery I/O connector

The Beam SDKs include built-in transforms that can read information from and write data to Google BigQuery tables.

Before you start

To utilize BigQueryIO, add the Maven artifact dependency to your pom.xml file.

                              <                dependency                >                <                groupId                >                org                .                apache                .                axle                </                groupId                >                <                artifactId                >                beam                -                sdks                -                java                -                io                -                google                -                cloud                -                platform                </                artifactId                >                <                version                >                two                .                36                .                0                </                version                >                </                dependency                >                          

Boosted resources:

  • BigQueryIO source code
  • BigQueryIO Javadoc
  • Google BigQuery documentation

To utilize BigQueryIO, you must install the Google Deject Platform dependencies by running pip install apache-beam[gcp].

Additional resources:

  • BigQueryIO source code
  • BigQueryIO Pydoc
  • Google BigQuery documentation

BigQuery basics

Table names

To read or write from a BigQuery table, you must provide a fully-qualified BigQuery table proper noun (for example, bigquery-public-information:github_repos.sample_contents). A fully-qualified BigQuery table proper name consists of iii parts:

  • Project ID: The ID for your Google Deject Projection. The default value comes from your pipeline options object.
  • Dataset ID: The BigQuery dataset ID, which is unique within a given Cloud Project.
  • Table ID: A BigQuery table ID, which is unique within a given dataset.

A table name can also include a table decorator if you lot are using time-partitioned tables.

To specify a BigQuery table, you lot tin can utilise either the tabular array's fully-qualified name as a string, or use a TableReference TableReference object.

Using a string

To specify a table with a string, use the format [project_id]:[dataset_id].[table_id] to specify the fully-qualified BigQuery table proper name.

                              String                tableSpec                =                "clouddataflow-readonly:samples.weather_stations"                ;                          

                              # project-id:dataset_id.table_id                table_spec                =                'clouddataflow-readonly:samples.weather_stations'                          

You tin can besides omit project_id and use the [dataset_id].[table_id] format. If yous omit the projection ID, Axle uses the default projection ID from your pipeline options. pipeline options.

                              String                tableSpec                =                "samples.weather_stations"                ;                          

                              # dataset_id.table_id                table_spec                =                'samples.weather_stations'                          

Using a TableReference

To specify a table with a TableReference, create a new TableReference using the three parts of the BigQuery table name.

                              TableReference                tableSpec                =                new                TableReference                ()                .                setProjectId                (                "clouddataflow-readonly"                )                .                setDatasetId                (                "samples"                )                .                setTableId                (                "weather_stations"                );                          

                              from                apache_beam.io.gcp.internal.clients                import                bigquery                table_spec                =                bigquery                .                TableReference                (                projectId                =                'clouddataflow-readonly'                ,                datasetId                =                'samples'                ,                tableId                =                'weather_stations'                )                          

The Beam SDK for Coffee besides provides the parseTableSpec helper method, which constructs a TableReference object from a String that contains the fully-qualified BigQuery table name. However, the static factory methods for BigQueryIO transforms take the table name as a String and construct a TableReference object for you.

Table rows

BigQueryIO read and write transforms produce and eat information as a PCollection of dictionaries, where each chemical element in the PCollection represents a single row in the table.

Schemas

When writing to BigQuery, you must supply a table schema for the destination table that you want to write to, unless you specify a create disposition of CREATE_NEVER. Creating a table schema covers schemas in more detail.

Data types

BigQuery supports the post-obit data types: String, BYTES, INTEGER, FLOAT, NUMERIC, BOOLEAN, TIMESTAMP, DATE, Fourth dimension, DATETIME and GEOGRAPHY. All possible values are described at https://deject.google.com/bigquery/docs/reference/standard-sql/information-types. BigQueryIO allows you lot to use all of these data types. The post-obit example shows the correct format for data types used when reading from and writing to BigQuery:

                              import                com.google.api.services.bigquery.model.TableRow                ;                import                java.math.BigDecimal                ;                import                java.nio.charset.StandardCharsets                ;                import                java.time.Instant                ;                import                coffee.time.LocalDate                ;                import                java.time.LocalDateTime                ;                import                java.time.LocalTime                ;                import                java.util.AbstractMap.SimpleEntry                ;                import                java.util.Arrays                ;                import                java.util.Base64                ;                import                coffee.util.stream.Collectors                ;                import                coffee.util.stream.Stream                ;                grade                BigQueryTableRowCreate                {                public                static                TableRow                createTableRow                ()                {                TableRow                row                =                new                TableRow                ()                // To larn more about BigQuery information types:                                                // https://deject.google.com/bigquery/docs/reference/standard-sql/data-types                                                .                prepare                (                "string_field"                ,                "UTF-8 strings are supported! 🌱🌳🌍"                )                .                set                (                "int64_field"                ,                432                )                .                set                (                "float64_field"                ,                3                .                141592653589793                )                .                set                (                "numeric_field"                ,                new                BigDecimal                (                "1234.56"                ).                toString                ())                .                set                (                "bool_field"                ,                true                )                .                set                (                "bytes_field"                ,                Base64                .                getEncoder                ()                .                encodeToString                (                "UTF-8 byte string 🌱🌳🌍"                .                getBytes                (                StandardCharsets                .                UTF_8                )))                // To learn more than about date formatting:                                                // https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html                                                .                set                (                "date_field"                ,                LocalDate                .                parse                (                "2020-03-nineteen"                ).                toString                ())                // ISO_LOCAL_DATE                                                .                gear up                (                "datetime_field"                ,                LocalDateTime                .                parse                (                "2020-03-19T20:41:25.123"                ).                toString                ())                // ISO_LOCAL_DATE_TIME                                                .                prepare                (                "time_field"                ,                LocalTime                .                parse                (                "twenty:41:25.123"                ).                toString                ())                // ISO_LOCAL_TIME                                                .                gear up                (                "timestamp_field"                ,                Instant                .                parse                (                "2020-03-20T03:41:42.123Z"                ).                toString                ())                // ISO_INSTANT                                                // To acquire more near the geography Well-Known Text (WKT) format:                                                // https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry                                                .                set up                (                "geography_field"                ,                "POINT(thirty x)"                )                // An assortment has its manner ready to REPEATED.                                                .                set                (                "array_field"                ,                Arrays                .                asList                (                1                ,                two                ,                3                ,                4                ))                // Any grade can be written as a STRUCT as long equally all the fields in the                                                // schema are present and they are encoded correctly as BigQuery types.                                                .                set                (                "struct_field"                ,                Stream                .                of                (                new                SimpleEntry                <>(                "string_value"                ,                "Text 🌱🌳🌍"                ),                new                SimpleEntry                <>(                "int64_value"                ,                "42"                ))                .                collect                (                Collectors                .                toMap                (                SimpleEntry                ::                getKey                ,                SimpleEntry                ::                getValue                )));                return                row                ;                }                }                          

                              bigquery_data                =                [{                'string'                :                'abc'                ,                'bytes'                :                base64                .                b64encode                (                b                '                \xab\xac                '                ),                'integer'                :                5                ,                'float'                :                0.5                ,                'numeric'                :                Decimal                (                '5'                ),                'boolean'                :                Truthful                ,                'timestamp'                :                '2018-12-31 12:44:31.744957 UTC'                ,                'date'                :                '2018-12-31'                ,                'fourth dimension'                :                '12:44:31'                ,                'datetime'                :                '2018-12-31T12:44:31'                ,                'geography'                :                'POINT(30 x)'                }]                          

Equally of Axle 2.7.0, the NUMERIC information type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data blazon works with Well-Known Text (Run into https://en.wikipedia.org/wiki/Well-known_text format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to exist encoded using base64 encoding when writing to BigQuery. When bytes are read from BigQuery they are returned every bit base64-encoded strings.

As of Axle 2.7.0, the NUMERIC information type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data type works with Well-Known Text (See https://en.wikipedia.org/wiki/Well-known_text format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to exist encoded using base64 encoding when writing to BigQuery. When bytes are read from BigQuery they are returned as base64-encoded bytes.

Reading from BigQuery

BigQueryIO allows you to read from a BigQuery table, or to execute a SQL query and read the results. By default, Axle invokes a BigQuery export asking when you apply a BigQueryIO read transform. All the same, the Beam SDK for Java as well supports using the BigQuery Storage API to read directly from BigQuery storage. Encounter Using the BigQuery Storage API for more than information.

Beam'southward use of BigQuery APIs is subject field to BigQuery'southward Quota and Pricing policies.

The Beam SDK for Coffee has ii BigQueryIO read methods. Both of these methods permit you to read from a table, or read fields using a query string.

  1. read(SerializableFunction) reads Avro-formatted records and uses a specified parsing function to parse them into a PCollection of custom typed objects. Each element in the PCollection represents a single row in the table. The example lawmaking for reading with a query string shows how to utilise read(SerializableFunction).

  2. readTableRows returns a PCollection of BigQuery TableRow objects. Each chemical element in the PCollection represents a single row in the tabular array. Integer values in the TableRow objects are encoded every bit strings to lucifer BigQuery'southward exported JSON format. This method is user-friendly, but tin can be 2-3 times slower in performance compared to read(SerializableFunction). The example lawmaking for reading from a table shows how to employ readTableRows.

Note: BigQueryIO.read() is deprecated every bit of Beam SDK two.ii.0. Instead, employ read(SerializableFunction<SchemaAndRecord, T>) to parse BigQuery rows from Avro GenericRecord into your custom type, or use readTableRows() to parse them into JSON TableRow objects.

To read from a BigQuery table using the Beam SDK for Python, use a ReadFromBigQuery transfrom. ReadFromBigQuery returns a PCollection of dictionaries, where each element in the PCollection represents a unmarried row in the table. Integer values in the TableRow objects are encoded every bit strings to match BigQuery's exported JSON format.

Note: BigQuerySource() is deprecated every bit of Beam SDK 2.25.0. Before 2.25.0, to read from a BigQuery table using the Beam SDK, you will utilise a Read transform on a BigQuerySource. For example, beam.io.Read(beam.io.BigQuerySource(table_spec)).

Reading from a tabular array

To read an entire BigQuery table, utilize the from method with a BigQuery table name. This example uses readTableRows.

To read an entire BigQuery tabular array, use the table parameter with the BigQuery table name.

The post-obit lawmaking reads an entire tabular array that contains atmospheric condition station data then extracts the max_temperature cavalcade.

                              import                org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData                ;                import                org.apache.beam.sdk.Pipeline                ;                import                org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO                ;                import                org.apache.axle.sdk.transforms.MapElements                ;                import                org.apache.beam.sdk.values.PCollection                ;                import                org.apache.beam.sdk.values.TypeDescriptor                ;                class                BigQueryReadFromTable                {                public                static                PCollection                <                MyData                >                readFromTable                (                Cord                projection                ,                Cord                dataset                ,                String                table                ,                Pipeline                pipeline                )                {                // Cord project = "my-project-id";                                                // Cord dataset = "my_bigquery_dataset_id";                                                // Cord table = "my_bigquery_table_id";                                                // Pipeline pipeline = Pipeline.create();                                                PCollection                <                MyData                >                rows                =                pipeline                .                apply                (                "Read from BigQuery query"                ,                BigQueryIO                .                readTableRows                ().                from                (                String                .                format                (                "%s:%s.%s"                ,                project                ,                dataset                ,                tabular array                )))                .                utilize                (                "TableRows to MyData"                ,                MapElements                .                into                (                TypeDescriptor                .                of                (                MyData                .                course                )).                via                (                MyData                ::                fromTableRow                ));                return                rows                ;                }                }                          

                              max_temperatures                =                (                pipeline                |                'ReadTable'                >>                beam                .                io                .                ReadFromBigQuery                (                tabular array                =                table_spec                )                # Each row is a dictionary where the keys are the BigQuery columns                |                beam                .                Map                (                lambda                elem                :                elem                [                'max_temperature'                ]))                          

Reading with a query string

If you don't want to read an entire tabular array, you can supply a query string with the fromQuery method.

If y'all don't want to read an unabridged table, you can supply a query string to ReadFromBigQuery by specifying the query parameter.

The following code uses a SQL query to only read the max_temperature column.

                              import                org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData                ;                import                org.apache.axle.sdk.Pipeline                ;                import                org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO                ;                import                org.apache.axle.sdk.transforms.MapElements                ;                import                org.apache.beam.sdk.values.PCollection                ;                import                org.apache.axle.sdk.values.TypeDescriptor                ;                course                BigQueryReadFromQuery                {                public                static                PCollection                <                MyData                >                readFromQuery                (                String                project                ,                Cord                dataset                ,                String                table                ,                Pipeline                pipeline                )                {                // String project = "my-project-id";                                                // String dataset = "my_bigquery_dataset_id";                                                // Cord table = "my_bigquery_table_id";                                                // Pipeline pipeline = Pipeline.create();                                                PCollection                <                MyData                >                rows                =                pipeline                .                employ                (                "Read from BigQuery query"                ,                BigQueryIO                .                readTableRows                ()                .                fromQuery                (                String                .                format                (                "SELECT * FROM `%s.%southward.%south`"                ,                project                ,                dataset                ,                table                ))                .                usingStandardSql                ())                .                utilise                (                "TableRows to MyData"                ,                MapElements                .                into                (                TypeDescriptor                .                of                (                MyData                .                class                )).                via                (                MyData                ::                fromTableRow                ));                return                rows                ;                }                }                          

                              max_temperatures                =                (                pipeline                |                'QueryTable'                >>                beam                .                io                .                ReadFromBigQuery                (                query                =                'SELECT max_temperature FROM '\                '[clouddataflow-readonly:samples.weather_stations]'                )                # Each row is a dictionary where the keys are the BigQuery columns                |                beam                .                Map                (                lambda                elem                :                elem                [                'max_temperature'                ]))                          

You lot tin can also use BigQuery's standard SQL dialect with a query string, as shown in the following example:

                              PCollection                <                Double                >                maxTemperatures                =                p                .                apply                (                BigQueryIO                .                read                (                (                SchemaAndRecord                elem                )                ->                (                Double                )                elem                .                getRecord                ().                get                (                "max_temperature"                ))                .                fromQuery                (                "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`"                )                .                usingStandardSql                ()                .                withCoder                (                DoubleCoder                .                of                ()));                          

                              max_temperatures                =                (                pipeline                |                'QueryTableStdSQL'                >>                axle                .                io                .                ReadFromBigQuery                (                query                =                'SELECT max_temperature FROM '\                '`clouddataflow-readonly.samples.weather_stations`'                ,                use_standard_sql                =                True                )                # Each row is a lexicon where the keys are the BigQuery columns                |                beam                .                Map                (                lambda                elem                :                elem                [                'max_temperature'                ]))                          

Using the BigQuery Storage API

The BigQuery Storage API allows y'all to directly access tables in BigQuery storage, and supports features such as cavalcade selection and predicate filter push-downward which tin allow more efficient pipeline execution.

The Beam SDK for Coffee supports using the BigQuery Storage API when reading from BigQuery. SDK versions earlier 2.25.0 support the BigQuery Storage API as an experimental feature and use the pre-GA BigQuery Storage API surface. Callers should migrate pipelines which apply the BigQuery Storage API to use SDK version 2.25.0 or after.

The Axle SDK for Python does not support the BigQuery Storage API. See Axle-10917).

Updating your lawmaking

Use the following methods when yous read from a tabular array:

  • Required: Specify withMethod(Method.DIRECT_READ) to use the BigQuery Storage API for the read operation.
  • Optional: To use features such as column project and column filtering, you must specify withSelectedFields and withRowRestriction respectively.

The following lawmaking snippet reads from a tabular array. This example is from the BigQueryTornadoes example. When the instance's read method choice is set up to DIRECT_READ, the pipeline uses the BigQuery Storage API and column projection to read public samples of atmospheric condition data from a BigQuery table. You tin view the full source code on GitHub.

                              import                java.util.Arrays                ;                import                org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData                ;                import                org.apache.beam.sdk.Pipeline                ;                import                org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO                ;                import                org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method                ;                import                org.apache.beam.sdk.transforms.MapElements                ;                import                org.apache.axle.sdk.values.PCollection                ;                import                org.apache.beam.sdk.values.TypeDescriptor                ;                class                BigQueryReadFromTableWithBigQueryStorageAPI                {                public                static                PCollection                <                MyData                >                readFromTableWithBigQueryStorageAPI                (                String                projection                ,                String                dataset                ,                String                table                ,                Pipeline                pipeline                )                {                // String projection = "my-project-id";                                                // String dataset = "my_bigquery_dataset_id";                                                // Cord tabular array = "my_bigquery_table_id";                                                // Pipeline pipeline = Pipeline.create();                                                PCollection                <                MyData                >                rows                =                pipeline                .                apply                (                "Read from BigQuery table"                ,                BigQueryIO                .                readTableRows                ()                .                from                (                String                .                format                (                "%s:%s.%s"                ,                project                ,                dataset                ,                table                ))                .                withMethod                (                Method                .                DIRECT_READ                )                .                withSelectedFields                (                Arrays                .                asList                (                "string_field"                ,                "int64_field"                ,                "float64_field"                ,                "numeric_field"                ,                "bool_field"                ,                "bytes_field"                ,                "date_field"                ,                "datetime_field"                ,                "time_field"                ,                "timestamp_field"                ,                "geography_field"                ,                "array_field"                ,                "struct_field"                )))                .                apply                (                "TableRows to MyData"                ,                MapElements                .                into                (                TypeDescriptor                .                of                (                MyData                .                form                )).                via                (                MyData                ::                fromTableRow                ));                return                rows                ;                }                }                          

                              # The SDK for Python does not back up the BigQuery Storage API.                          

The following lawmaking snippet reads with a query string.

                              import                org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData                ;                import                org.apache.beam.sdk.Pipeline                ;                import                org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO                ;                import                org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method                ;                import                org.apache.axle.sdk.transforms.MapElements                ;                import                org.apache.axle.sdk.values.PCollection                ;                import                org.apache.beam.sdk.values.TypeDescriptor                ;                class                BigQueryReadFromQueryWithBigQueryStorageAPI                {                public                static                PCollection                <                MyData                >                readFromQueryWithBigQueryStorageAPI                (                String                project                ,                Cord                dataset                ,                Cord                table                ,                String                query                ,                Pipeline                pipeline                )                {                // String projection = "my-project-id";                                                // String dataset = "my_bigquery_dataset_id";                                                // String table = "my_bigquery_table_id";                                                // Pipeline pipeline = Pipeline.create();                                                /*                                                  String query = Cord.format("SELECT\n" +                                                  "  string_field,\n" +                                                  "  int64_field,\n" +                                                  "  float64_field,\n" +                                                  "  numeric_field,\n" +                                                  "  bool_field,\north" +                                                  "  bytes_field,\n" +                                                  "  date_field,\n" +                                                  "  datetime_field,\n" +                                                  "  time_field,\n" +                                                  "  timestamp_field,\n" +                                                  "  geography_field,\northward" +                                                  "  array_field,\northward" +                                                  "  struct_field\due north" +                                                  "FROM\n" +                                                  "  `%s:%southward.%s`", projection, dataset, table)                                                  */                PCollection                <                MyData                >                rows                =                pipeline                .                utilise                (                "Read from BigQuery table"                ,                BigQueryIO                .                readTableRows                ()                .                fromQuery                (                query                )                .                usingStandardSql                ()                .                withMethod                (                Method                .                DIRECT_READ                ))                .                use                (                "TableRows to MyData"                ,                MapElements                .                into                (                TypeDescriptor                .                of                (                MyData                .                class                )).                via                (                MyData                ::                fromTableRow                ));                return                rows                ;                }                }                          

                              # The SDK for Python does not support the BigQuery Storage API.                          

Writing to BigQuery

BigQueryIO allows you to write to BigQuery tables. If you lot are using the Beam SDK for Coffee, yous tin as well write different rows to different tables.

BigQueryIO write transforms use APIs that are subject area to BigQuery's Quota and Pricing policies.

When yous use a write transform, you must provide the following information for the destination table(south):

  • The tabular array name.
  • The destination table's create disposition. The create disposition specifies whether the destination table must exist or can be created by the write functioning.
  • The destination table's write disposition. The write disposition specifies whether the data you write will replace an existing table, append rows to an existing table, or write only to an empty table.

In add-on, if your write performance creates a new BigQuery tabular array, you must also supply a tabular array schema for the destination table.

Create disposition

The create disposition controls whether or not your BigQuery write operation should create a table if the destination table does non exist.

Utilise .withCreateDisposition to specify the create disposition. Valid enum values are:

  • Write.CreateDisposition.CREATE_IF_NEEDED: Specifies that the write operation should create a new table if 1 does not be. If y'all use this value, yous must provide a table schema with the withSchema method. CREATE_IF_NEEDED is the default behavior.

  • Write.CreateDisposition.CREATE_NEVER: Specifies that a table should never be created. If the destination tabular array does non exist, the write operation fails.

Use the create_disposition parameter to specify the create disposition. Valid enum values are:

  • BigQueryDisposition.CREATE_IF_NEEDED: Specifies that the write operation should create a new table if one does not exist. If yous employ this value, you must provide a table schema. CREATE_IF_NEEDED is the default behavior.

  • BigQueryDisposition.CREATE_NEVER: Specifies that a tabular array should never be created. If the destination table does not exist, the write operation fails.

If you specify CREATE_IF_NEEDED as the create disposition and you lot don't supply a table schema, the transform might fail at runtime if the destination table does not exist.

Write disposition

The write disposition controls how your BigQuery write functioning applies to an existing table.

Use .withWriteDisposition to specify the write disposition. Valid enum values are:

  • Write.WriteDisposition.WRITE_EMPTY: Specifies that the write operation should fail at runtime if the destination table is non empty. WRITE_EMPTY is the default behavior.

  • Write.WriteDisposition.WRITE_TRUNCATE: Specifies that the write operation should supervene upon an existing table. Any existing rows in the destination table are removed, and the new rows are added to the table.

  • Write.WriteDisposition.WRITE_APPEND: Specifies that the write functioning should append the rows to the finish of the existing table.

Utilize the write_disposition parameter to specify the write disposition. Valid enum values are:

  • BigQueryDisposition.WRITE_EMPTY: Specifies that the write performance should fail at runtime if the destination table is non empty. WRITE_EMPTY is the default behavior.

  • BigQueryDisposition.WRITE_TRUNCATE: Specifies that the write operation should supplant an existing tabular array. Any existing rows in the destination tabular array are removed, and the new rows are added to the table.

  • BigQueryDisposition.WRITE_APPEND: Specifies that the write operation should append the rows to the end of the existing table.

When you utilize WRITE_EMPTY, the check for whether or not the destination table is empty can occur before the actual write functioning. This check doesn't guarantee that your pipeline will have sectional access to the tabular array. Two concurrent pipelines that write to the same output table with a write disposition of WRITE_EMPTY might start successfully, but both pipelines can fail afterward when the write attempts happen.

Creating a table schema

If your BigQuery write functioning creates a new tabular array, you lot must provide schema information. The schema contains information almost each field in the tabular array.

To create a table schema in Java, you can either use a TableSchema object, or use a string that contains a JSON-serialized TableSchema object.

To create a table schema in Python, y'all can either employ a TableSchema object, or utilise a cord that defines a listing of fields. Single string based schemas practise not support nested fields, repeated fields, or specifying a BigQuery mode for fields (the mode will always be set up to NULLABLE).

Using a TableSchema

To create and use a table schema as a TableSchema object, follow these steps.

  1. Create a list of TableFieldSchema objects. Each TableFieldSchema object represents a field in the table.

  2. Create a TableSchema object and apply the setFields method to specify your list of fields.

  3. Apply the withSchema method to provide your tabular array schema when yous apply a write transform.

  1. Create a TableSchema object.

  2. Create and append a TableFieldSchema object for each field in your tabular array.

  3. Side by side, apply the schema parameter to provide your tabular array schema when you lot apply a write transform. Gear up the parameter'southward value to the TableSchema object.

The following example code shows how to create a TableSchema for a tabular array with two fields (source and quote) of type string.

                              import                com.google.api.services.bigquery.model.TableFieldSchema                ;                import                com.google.api.services.bigquery.model.TableSchema                ;                import                coffee.util.Arrays                ;                class                BigQuerySchemaCreate                {                public                static                TableSchema                createSchema                ()                {                // To acquire more than about BigQuery schemas:                                                // https://cloud.google.com/bigquery/docs/schemas                                                TableSchema                schema                =                new                TableSchema                ()                .                setFields                (                Arrays                .                asList                (                new                TableFieldSchema                ()                .                setName                (                "string_field"                )                .                setType                (                "STRING"                )                .                setMode                (                "REQUIRED"                ),                new                TableFieldSchema                ()                .                setName                (                "int64_field"                )                .                setType                (                "INT64"                )                .                setMode                (                "NULLABLE"                ),                new                TableFieldSchema                ()                .                setName                (                "float64_field"                )                .                setType                (                "FLOAT64"                ),                // default mode is "NULLABLE"                                                new                TableFieldSchema                ().                setName                (                "numeric_field"                ).                setType                (                "NUMERIC"                ),                new                TableFieldSchema                ().                setName                (                "bool_field"                ).                setType                (                "BOOL"                ),                new                TableFieldSchema                ().                setName                (                "bytes_field"                ).                setType                (                "BYTES"                ),                new                TableFieldSchema                ().                setName                (                "date_field"                ).                setType                (                "Appointment"                ),                new                TableFieldSchema                ().                setName                (                "datetime_field"                ).                setType                (                "DATETIME"                ),                new                TableFieldSchema                ().                setName                (                "time_field"                ).                setType                (                "Fourth dimension"                ),                new                TableFieldSchema                ().                setName                (                "timestamp_field"                ).                setType                (                "TIMESTAMP"                ),                new                TableFieldSchema                ().                setName                (                "geography_field"                ).                setType                (                "GEOGRAPHY"                ),                new                TableFieldSchema                ()                .                setName                (                "array_field"                )                .                setType                (                "INT64"                )                .                setMode                (                "REPEATED"                )                .                setDescription                (                "Setting the style to REPEATED makes this an ARRAY<INT64>."                ),                new                TableFieldSchema                ()                .                setName                (                "struct_field"                )                .                setType                (                "STRUCT"                )                .                setDescription                (                "A STRUCT accepts a custom data form, the fields must match the custom class fields."                )                .                setFields                (                Arrays                .                asList                (                new                TableFieldSchema                ().                setName                (                "string_value"                ).                setType                (                "STRING"                ),                new                TableFieldSchema                ().                setName                (                "int64_value"                ).                setType                (                "INT64"                )))));                return                schema                ;                }                }                          

                              table_schema                =                {                'fields'                :                [{                'proper name'                :                'source'                ,                'type'                :                'STRING'                ,                'mode'                :                'NULLABLE'                },                {                'name'                :                'quote'                ,                'blazon'                :                'STRING'                ,                'mode'                :                'REQUIRED'                }]                }                          

Using a string

To create and employ a table schema as a string that contains JSON-serialized TableSchema object, follow these steps.

  1. Create a string that contains a JSON-serialized TableSchema object.

  2. Utilize the withJsonSchema method to provide your table schema when yous apply a write transform.

To create and use a table schema as a string, follow these steps.

  1. Create a single comma separated string of the form "field1:type1,field2:type2,field3:type3" that defines a listing of fields. The type should specify the field's BigQuery type.

  2. Employ the schema parameter to provide your table schema when yous apply a write transform. Ready the parameter's value to the string.

The following example shows how to utilise a string to specify the same table schema every bit the previous instance.

                              String                tableSchemaJson                =                ""                +                "{"                +                "  \"fields\": ["                +                "    {"                +                "      \"proper noun\": \"source\","                +                "      \"blazon\": \"Cord\","                +                "      \"manner\": \"NULLABLE\""                +                "    },"                +                "    {"                +                "      \"name\": \"quote\","                +                "      \"type\": \"Cord\","                +                "      \"style\": \"REQUIRED\""                +                "    }"                +                "  ]"                +                "}"                ;                          

                              # column_name:BIGQUERY_TYPE, ...                table_schema                =                'source:STRING, quote:String'                          

Setting the insertion method

BigQueryIO supports ii methods of inserting data into BigQuery: load jobs and streaming inserts. Each insertion method provides dissimilar tradeoffs of cost, quota, and information consistency. See the BigQuery documentation for different data ingestion options (specifically, load jobs and streaming inserts) for more information about these tradeoffs.

BigQueryIO chooses a default insertion method based on the input PCollection. You can utilize withMethod to specify the desired insertion method. Run across Write.Method for the list of the available methods and their restrictions.

BigQueryIO chooses a default insertion method based on the input PCollection. Y'all can utilize method to specify the desired insertion method. See WriteToBigQuery for the list of the available methods and their restrictions.

BigQueryIO uses load jobs in the following situations:

  • When you use a BigQueryIO write transform to a bounded PCollection.
  • When you specify load jobs as the insertion method using BigQueryIO.write().withMethod(FILE_LOADS).
  • When you apply a BigQueryIO write transform to a bounded PCollection.
  • When y'all specify load jobs as the insertion method using WriteToBigQuery(method='FILE_LOADS').

Note: If you apply batch loads in a streaming pipeline:

You must use withTriggeringFrequency to specify a triggering frequency for initiating load jobs. Be careful about setting the frequency such that your pipeline doesn't exceed the BigQuery load job quota limit.

You lot tin can either use withNumFileShards to explicitly set the number of file shards written, or use withAutoSharding to enable dynamic sharding (starting 2.29.0 release) and the number of shards may be determined and changed at runtime. The sharding beliefs depends on the runners.

Yous must employ triggering_frequency to specify a triggering frequency for initiating load jobs. Be careful nearly setting the frequency such that your pipeline doesn't exceed the BigQuery load chore quota limit.

You lot can set with_auto_sharding=True to enable dynamic sharding (starting 2.29.0 release). The number of shards may be determined and changed at runtime. The sharding behavior depends on the runners.

BigQueryIO uses streaming inserts in the following situations:

  • When you apply a BigQueryIO write transform to an unbounded PCollection.
  • When you specify streaming inserts as the insertion method using BigQueryIO.write().withMethod(STREAMING_INSERTS).
  • When yous apply a BigQueryIO write transform to an unbounded PCollection.
  • When you specify streaming inserts as the insertion method using WriteToBigQuery(method='STREAMING_INSERTS').

Notation: Streaming inserts by default enables BigQuery best-effort deduplication machinery. You can disable that by setting ignoreInsertIds. The quota limitations are dissimilar when deduplication is enabled vs. disabled.

Streaming inserts applies a default sharding for each tabular array destination. Yous tin use withAutoSharding (starting 2.28.0 release) to enable dynamic sharding and the number of shards may be determined and changed at runtime. The sharding behavior depends on the runners.

Notation: Streaming inserts by default enables BigQuery all-time-effort deduplication mechanism. Yous tin can disable that by setting ignore_insert_ids=Truthful. The quota limitations are dissimilar when deduplication is enabled vs. disabled.

Streaming inserts applies a default sharding for each table destination. Yous can set with_auto_sharding=True (starting two.29.0 release) to enable dynamic sharding. The number of shards may be adamant and changed at runtime. The sharding beliefs depends on the runners.

Writing to a tabular array

To write to a BigQuery tabular array, apply either a writeTableRows or write transform.

To write to a BigQuery table, apply the WriteToBigQuery transform. WriteToBigQuery supports both batch mode and streaming mode. You must apply the transform to a PCollection of dictionaries. In general, y'all'll need to use another transform, such every bit ParDo, to format your output data into a collection.

The post-obit examples utilize this PCollection that contains quotes.

The writeTableRows method writes a PCollection of BigQuery TableRow objects to a BigQuery table. Each chemical element in the PCollection represents a single row in the table. This example uses writeTableRows to write elements to a PCollection<TableRow>. The write operation creates a table if needed; if the tabular array already exists, it will exist replaced.

                              import                com.google.api.services.bigquery.model.TableRow                ;                import                com.google.api.services.bigquery.model.TableSchema                ;                import                org.apache.axle.sdk.io.gcp.bigquery.BigQueryIO                ;                import                org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition                ;                import                org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition                ;                import                org.apache.beam.sdk.values.PCollection                ;                class                BigQueryWriteToTable                {                public                static                void                writeToTable                (                String                projection                ,                String                dataset                ,                String                table                ,                TableSchema                schema                ,                PCollection                <                TableRow                >                rows                )                {                // Cord projection = "my-project-id";                                                // String dataset = "my_bigquery_dataset_id";                                                // String tabular array = "my_bigquery_table_id";                                                // TableSchema schema = new TableSchema().setFields(Arrays.asList(...));                                                // Pipeline pipeline = Pipeline.create();                                                // PCollection<TableRow> rows = ...                                                rows                .                utilize                (                "Write to BigQuery"                ,                BigQueryIO                .                writeTableRows                ()                .                to                (                Cord                .                format                (                "%s:%s.%s"                ,                project                ,                dataset                ,                table                ))                .                withSchema                (                schema                )                // For CreateDisposition:                                                // - CREATE_IF_NEEDED (default): creates the table if information technology doesn't exist, a schema is                                                // required                                                // - CREATE_NEVER: raises an error if the table doesn't exist, a schema is not needed                                                .                withCreateDisposition                (                CreateDisposition                .                CREATE_IF_NEEDED                )                // For WriteDisposition:                                                // - WRITE_EMPTY (default): raises an fault if the tabular array is not empty                                                // - WRITE_APPEND: appends new rows to existing rows                                                // - WRITE_TRUNCATE: deletes the existing rows before writing                                                .                withWriteDisposition                (                WriteDisposition                .                WRITE_TRUNCATE                ));                // pipeline.run().waitUntilFinish();                                                }                }                          

                              quotes                =                pipeline                |                beam                .                Create                ([                {                'source'                :                'Mahatma Gandhi'                ,                'quote'                :                'My life is my message.'                },                {                'source'                :                'Yoda'                ,                'quote'                :                "Do, or do not. At that place is no 'try'."                },                ])                          

The following example code shows how to apply a WriteToBigQuery transform to write a PCollection of dictionaries to a BigQuery table. The write performance creates a table if needed; if the tabular array already exists, it will exist replaced.

                              quotes                |                axle                .                io                .                WriteToBigQuery                (                table_spec                ,                schema                =                table_schema                ,                write_disposition                =                beam                .                io                .                BigQueryDisposition                .                WRITE_TRUNCATE                ,                create_disposition                =                axle                .                io                .                BigQueryDisposition                .                CREATE_IF_NEEDED                )                          

The write transform writes a PCollection of custom typed objects to a BigQuery table. Utilise .withFormatFunction(SerializableFunction) to provide a formatting function that converts each input element in the PCollection into a TableRow. This instance uses write to write a PCollection<String>. The write operation creates a table if needed; if the table already exists, information technology will be replaced.

                              quotes                .                utilise                (                BigQueryIO                .<                Quote                >                write                ()                .                to                (                tableSpec                )                .                withSchema                (                tableSchema                )                .                withFormatFunction                (                (                Quote                elem                )                ->                new                TableRow                ().                set                (                "source"                ,                elem                .                source                ).                set                (                "quote"                ,                elem                .                quote                ))                .                withCreateDisposition                (                CreateDisposition                .                CREATE_IF_NEEDED                )                .                withWriteDisposition                (                WriteDisposition                .                WRITE_TRUNCATE                ));                          

When you use streaming inserts, yous can make up one's mind what to do with failed records. You can either keep retrying, or render the failed records in a separate PCollection using the WriteResult.getFailedInserts() method.

Using dynamic destinations

You tin can employ the dynamic destinations feature to write elements in a PCollection to different BigQuery tables, possibly with different schemas.

The dynamic destinations feature groups your user type by a user-defined destination key, uses the key to compute a destination table and/or schema, and writes each group's elements to the computed destination.

In add-on, you tin can also write your ain types that accept a mapping function to TableRow, and you tin use side inputs in all DynamicDestinations methods.

To use dynamic destinations, yous must create a DynamicDestinations object and implement the post-obit methods:

  • getDestination: Returns an object that getTable and getSchema can use equally the destination fundamental to compute the destination table and/or schema.

  • getTable: Returns the table (every bit a TableDestination object) for the destination key. This method must return a unique table for each unique destination.

  • getSchema: Returns the table schema (as a TableSchema object) for the destination key.

Then, use write().to with your DynamicDestinations object. This instance uses a PCollection that contains conditions data and writes the information into a different table for each yr.

                              /*                                @DefaultCoder(AvroCoder.class)                                static class WeatherData {                                                  concluding long year;                                                  final long month;                                                  final long day;                                                  final double maxTemp;                                                                                  public WeatherData() {                                                  this.year = 0;                                                  this.month = 0;                                                  this.twenty-four hours = 0;                                                  this.maxTemp = 0.0f;                                                  }                                                  public WeatherData(long year, long calendar month, long solar day, double maxTemp) {                                                  this.twelvemonth = twelvemonth;                                                  this.month = month;                                                  this.day = day;                                                  this.maxTemp = maxTemp;                                                  }                                }                                */                PCollection                <                WeatherData                >                weatherData                =                p                .                use                (                BigQueryIO                .                read                (                (                SchemaAndRecord                elem                )                ->                {                GenericRecord                record                =                elem                .                getRecord                ();                return                new                WeatherData                (                (                Long                )                record                .                get                (                "year"                ),                (                Long                )                tape                .                get                (                "month"                ),                (                Long                )                record                .                get                (                "twenty-four hour period"                ),                (                Double                )                record                .                get                (                "max_temperature"                ));                })                .                fromQuery                (                "SELECT year, month, mean solar day, max_temperature "                +                "FROM [clouddataflow-readonly:samples.weather_stations] "                +                "WHERE twelvemonth BETWEEN 2007 AND 2009"                )                .                withCoder                (                AvroCoder                .                of                (                WeatherData                .                class                )));                // We will ship the weather information into different tables for every twelvemonth.                                                weatherData                .                apply                (                BigQueryIO                .<                WeatherData                >                write                ()                .                to                (                new                DynamicDestinations                <                WeatherData                ,                Long                >()                {                @Override                public                Long                getDestination                (                ValueInSingleWindow                <                WeatherData                >                elem                )                {                return                elem                .                getValue                ().                yr                ;                }                @Override                public                TableDestination                getTable                (                Long                destination                )                {                return                new                TableDestination                (                new                TableReference                ()                .                setProjectId                (                writeProject                )                .                setDatasetId                (                writeDataset                )                .                setTableId                (                writeTable                +                "_"                +                destination                ),                "Table for year "                +                destination                );                }                @Override                public                TableSchema                getSchema                (                Long                destination                )                {                return                new                TableSchema                ()                .                setFields                (                ImmutableList                .                of                (                new                TableFieldSchema                ()                .                setName                (                "year"                )                .                setType                (                "INTEGER"                )                .                setMode                (                "REQUIRED"                ),                new                TableFieldSchema                ()                .                setName                (                "calendar month"                )                .                setType                (                "INTEGER"                )                .                setMode                (                "REQUIRED"                ),                new                TableFieldSchema                ()                .                setName                (                "24-hour interval"                )                .                setType                (                "INTEGER"                )                .                setMode                (                "REQUIRED"                ),                new                TableFieldSchema                ()                .                setName                (                "maxTemp"                )                .                setType                (                "Float"                )                .                setMode                (                "NULLABLE"                )));                }                })                .                withFormatFunction                (                (                WeatherData                elem                )                ->                new                TableRow                ()                .                set                (                "year"                ,                elem                .                year                )                .                set up                (                "calendar month"                ,                elem                .                calendar month                )                .                set                (                "day"                ,                elem                .                day                )                .                ready                (                "maxTemp"                ,                elem                .                maxTemp                ))                .                withCreateDisposition                (                CreateDisposition                .                CREATE_IF_NEEDED                )                .                withWriteDisposition                (                WriteDisposition                .                WRITE_TRUNCATE                ));                          

                              fictional_characters_view                =                axle                .                pvalue                .                AsDict                (                pipeline                |                'CreateCharacters'                >>                beam                .                Create                ([(                'Yoda'                ,                Truthful                ),                (                'Obi Wan Kenobi'                ,                True                )]))                def                table_fn                (                element                ,                fictional_characters                ):                if                element                in                fictional_characters                :                return                'my_dataset.fictional_quotes'                else                :                return                'my_dataset.real_quotes'                quotes                |                'WriteWithDynamicDestination'                >>                beam                .                io                .                WriteToBigQuery                (                table_fn                ,                schema                =                table_schema                ,                table_side_inputs                =                (                fictional_characters_view                ,                ),                write_disposition                =                beam                .                io                .                BigQueryDisposition                .                WRITE_TRUNCATE                ,                create_disposition                =                beam                .                io                .                BigQueryDisposition                .                CREATE_IF_NEEDED                )                          

Using fourth dimension segmentation

BigQuery time partitioning divides your table into smaller partitions, which is chosen a partitioned tabular array. Partitioned tables make it easier for you to manage and query your data.

To apply BigQuery time division, use one of these two methods:

  • withTimePartitioning: This method takes a TimePartitioning course, and is only usable if y'all are writing to a unmarried tabular array.

  • withJsonTimePartitioning: This method is the same as withTimePartitioning, but takes a JSON-serialized String object.

This case generates ane partition per day.

                              weatherData                .                apply                (                BigQueryIO                .<                WeatherData                >                write                ()                .                to                (                tableSpec                +                "_partitioning"                )                .                withSchema                (                tableSchema                )                .                withFormatFunction                (                (                WeatherData                elem                )                ->                new                TableRow                ()                .                gear up                (                "year"                ,                elem                .                year                )                .                set                (                "month"                ,                elem                .                calendar month                )                .                fix                (                "24-hour interval"                ,                elem                .                day                )                .                set                (                "maxTemp"                ,                elem                .                maxTemp                ))                // NOTE: an existing table without time partitioning set up will not work                                                .                withTimePartitioning                (                new                TimePartitioning                ().                setType                (                "DAY"                ))                .                withCreateDisposition                (                CreateDisposition                .                CREATE_IF_NEEDED                )                .                withWriteDisposition                (                WriteDisposition                .                WRITE_TRUNCATE                ));                          

                              quotes                |                'WriteWithTimePartitioning'                >>                beam                .                io                .                WriteToBigQuery                (                table_spec                ,                schema                =                table_schema                ,                write_disposition                =                axle                .                io                .                BigQueryDisposition                .                WRITE_TRUNCATE                ,                create_disposition                =                beam                .                io                .                BigQueryDisposition                .                CREATE_IF_NEEDED                ,                additional_bq_parameters                =                {                'timePartitioning'                :                {                'blazon'                :                'HOUR'                }})                          

Limitations

BigQueryIO currently has the post-obit limitations.

  1. Yous can't sequence the completion of a BigQuery write with other steps of your pipeline.

  2. If you are using the Beam SDK for Python, you might accept import size quota issues if you write a very large dataset. Every bit a workaround, you lot can division the dataset (for example, using Beam's Partition transform) and write to multiple BigQuery tables. The Beam SDK for Coffee does not accept this limitation as it partitions your dataset for you.

Boosted examples

Y'all tin can find additional examples that use BigQuery in Beam's examples directories.

Java cookbook examples

These examples are from the Java cookbook examples directory.

  • BigQueryTornadoes reads the public samples of weather information from BigQuery, counts the number of tornadoes that occur in each month, and writes the results to a BigQuery table.

  • CombinePerKeyExamples reads the public Shakespeare data from BigQuery, and for each word in the dataset that exceeds a given length, generates a string containing the list of play names in which that give-and-take appears. The pipeline then writes the results to a BigQuery table.

  • FilterExamples reads public samples of weather data from BigQuery, performs a projection on the data, finds the global mean of the temperature readings, filters on readings for a single given calendar month, and outputs only data (for that month) that has a hateful temp smaller than the derived global mean.

  • JoinExamples reads a sample of the GDELT "world event" from BigQuery and joins the event action land code confronting a table that maps country codes to country names.

  • MaxPerKeyExamples reads the public samples of conditions information from BigQuery, finds the maximum temperature for each month, and writes the results to a BigQuery table.

  • TriggerExample performs a streaming analysis of traffic data from San Diego freeways. The pipeline looks at the data coming in from a text file and writes the results to a BigQuery tabular array.

Java consummate examples

These examples are from the Java complete examples directory.

  • AutoComplete computes the most popular hash tags for every prefix, which can exist used for automobile-completion. The pipeline can optionally write the results to a BigQuery table.

  • StreamingWordExtract reads lines of text, splits each line into individual words, capitalizes those words, and writes the output to a BigQuery table.

  • TrafficMaxLaneFlow reads traffic sensor data, finds the lane that had the highest recorded flow, and writes the results to a BigQuery table.

  • TrafficRoutes reads traffic sensor data, calculates the average speed for each window and looks for slowdowns in routes, and writes the results to a BigQuery table.

Python cookbook examples

These examples are from the Python cookbook examples directory.

  • BigQuery schema creates a TableSchema with nested and repeated fields, generates data with nested and repeated fields, and writes the data to a BigQuery tabular array.

  • BigQuery side inputs uses BigQuery sources every bit a side inputs. It illustrates how to insert side-inputs into transforms in three different forms: as a singleton, as a iterator, and as a listing.

  • BigQuery tornadoes reads from a BigQuery table that has the 'month' and 'tornado' fields as part of the table schema, computes the number of tornadoes in each month, and outputs the results to a BigQuery table.

  • BigQuery filters reads weather station information from a BigQuery table, manipulates BigQuery rows in memory, and writes the results to a BigQuery table.

whiteponsin.blogspot.com

Source: https://beam.apache.org/documentation/io/built-in/google-bigquery/

0 Response to "Dataflow How to Read in Big Data Set"

Postar um comentário

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel