Skip to content
This repository was archived by the owner on Jun 30, 2022. It is now read-only.

Commit b2816c6

Browse files
chamikaramjaaltay
authored andcommitted
Several fixes related to schema specified when creating a BigQuery sink.
(1) Fixes a bug that prevented repeated fields working properly for DataflowRunner. (2) Updates documentation of 'schema' parameter. Removes text that says that schema can be specified as a dictionary since we do not actually support that. Clarifies the limitations of specifying specifying a schema as a string and recommends using 'bigquery.TableSchema' if the schema is complicated. (3) Adds a cookbook example that demonstrates how to build a 'bigquery.TableSchema' object with nested and repeated tables and how to write to a table created using that schema. ----Release Notes---- Fixes a bug that required type of record fields to be specified in all caps when using DataflowRunner. [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=118003947
1 parent c4e9656 commit b2816c6

2 files changed

Lines changed: 135 additions & 4 deletions

File tree

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# Copyright 2016 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""A workflow that writes to a BigQuery table with nested and repeated fields.
16+
17+
Demonstrates how to build a bigquery.TableSchema object with nested and repeated
18+
fields. Also, shows how to generate data to be written to a BigQuery table with
19+
nested and repeated fields.
20+
"""
21+
22+
from __future__ import absolute_import
23+
24+
import argparse
25+
import logging
26+
27+
import google.cloud.dataflow as df
28+
29+
30+
def run(argv=None):
31+
"""Run the workflow."""
32+
parser = argparse.ArgumentParser()
33+
34+
parser.add_argument(
35+
'--output',
36+
required=True,
37+
help=
38+
('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
39+
'or DATASET.TABLE.'))
40+
known_args, pipeline_args = parser.parse_known_args(argv)
41+
42+
p = df.Pipeline(argv=pipeline_args)
43+
44+
from apitools.clients import bigquery # pylint: disable=g-import-not-at-top
45+
46+
table_schema = bigquery.TableSchema()
47+
48+
# Fields that use standard types.
49+
kind_schema = bigquery.TableFieldSchema()
50+
kind_schema.name = 'kind'
51+
kind_schema.type = 'string'
52+
kind_schema.mode = 'nullable'
53+
table_schema.fields.append(kind_schema)
54+
55+
full_name_schema = bigquery.TableFieldSchema()
56+
full_name_schema.name = 'fullName'
57+
full_name_schema.type = 'string'
58+
full_name_schema.mode = 'required'
59+
table_schema.fields.append(full_name_schema)
60+
61+
age_schema = bigquery.TableFieldSchema()
62+
age_schema.name = 'age'
63+
age_schema.type = 'integer'
64+
age_schema.mode = 'nullable'
65+
table_schema.fields.append(age_schema)
66+
67+
gender_schema = bigquery.TableFieldSchema()
68+
gender_schema.name = 'gender'
69+
gender_schema.type = 'string'
70+
gender_schema.mode = 'nullable'
71+
table_schema.fields.append(gender_schema)
72+
73+
# A nested field
74+
phone_number_schema = bigquery.TableFieldSchema()
75+
phone_number_schema.name = 'phoneNumber'
76+
phone_number_schema.type = 'record'
77+
phone_number_schema.mode = 'nullable'
78+
79+
area_code = bigquery.TableFieldSchema()
80+
area_code.name = 'areaCode'
81+
area_code.type = 'integer'
82+
area_code.mode = 'nullable'
83+
phone_number_schema.fields.append(area_code)
84+
85+
number = bigquery.TableFieldSchema()
86+
number.name = 'number'
87+
number.type = 'integer'
88+
number.mode = 'nullable'
89+
phone_number_schema.fields.append(number)
90+
table_schema.fields.append(phone_number_schema)
91+
92+
# A repeated field.
93+
children_schema = bigquery.TableFieldSchema()
94+
children_schema.name = 'children'
95+
children_schema.type = 'string'
96+
children_schema.mode = 'repeated'
97+
table_schema.fields.append(children_schema)
98+
99+
def create_random_record(record_id):
100+
return {'kind': 'kind' + record_id, 'fullName': 'fullName'+record_id,
101+
'age': int(record_id) * 10, 'gender': 'male',
102+
'phoneNumber': {
103+
'areaCode': int(record_id) * 100,
104+
'number': int(record_id) * 100000},
105+
'children': ['child' + record_id + '1',
106+
'child' + record_id + '2',
107+
'child' + record_id + '3']
108+
}
109+
110+
# pylint: disable=expression-not-assigned
111+
record_ids = p | df.Create('CreateIDs', ['1', '2', '3', '4', '5'])
112+
records = record_ids | df.Map('CreateRecords', create_random_record)
113+
records | df.io.Write(
114+
'write',
115+
df.io.BigQuerySink(
116+
known_args.output,
117+
schema=table_schema,
118+
create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED,
119+
write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE))
120+
121+
# Run the pipeline (all operations are deferred until run() is called).
122+
p.run()
123+
124+
125+
if __name__ == '__main__':
126+
logging.getLogger().setLevel(logging.INFO)
127+
run()

google/cloud/dataflow/io/bigquery.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,9 +347,13 @@ def __init__(self, table, dataset=None, project=None, schema=None,
347347
reference is specified entirely by the table argument.
348348
project: The ID of the project containing this table or null if the table
349349
reference is specified entirely by the table argument.
350-
schema: A bigquery.TableSchema instance or a dictionary associating field
351-
names with types. Possible types are: STRING, INTEGER, FLOAT, BOOLEAN,
352-
TIMESTAMP, RECORD (e.g., {'month': 'STRING', 'count': 'INTEGER'}).
350+
schema: The schema to be used if the BigQuery table to write has to be
351+
created. This can be either specified as a 'bigquery.TableSchema' object
352+
or a single string of the form 'field1:type1,field2:type2,field3:type3'
353+
that defines a comma separated list of fields. Here 'type' should
354+
specify the BigQuery type of the field. Single string based schemas do
355+
not support nested fields, repeated fields, or specifying a BigQuery
356+
mode for fields (mode will always be set to 'NULLABLE').
353357
create_disposition: A string describing what happens if the table does not
354358
exist. Possible values are:
355359
- BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.
@@ -417,7 +421,7 @@ def schema_list_as_object(schema_list):
417421
fs['description'] = f.description
418422
if f.mode is not None:
419423
fs['mode'] = f.mode
420-
if f.type == 'RECORD':
424+
if f.type.lower() == 'record':
421425
fs['fields'] = schema_list_as_object(f.fields)
422426
fields.append(fs)
423427
return fields

0 commit comments

Comments
 (0)