1+ """Programatically create a BigQuery table from a CSV."""
12from google .cloud import storage
23from google .cloud import bigquery
4+ from config import bucketURI , bucketName , bigqueryDataset , bigqueryTable , localDataFile , destinationBlobName
35import pprint
46
5- bucket_uri = 'gs://your-bucket/'
6- bucket_name = 'your-bucket'
7- bucket_target = 'datasets/data_upload.csv'
8- local_dataset = 'data/test.csv'
9- bucket_target_uri = bucket_uri + bucket_target
10- bigquery_dataset = 'uploadtest'
11- bigquery_table = 'my_table'
127
13-
14- def upload_blob (bucket_name , source_file_name , destination_blob_name ):
15- """Upload a CSV to Google Cloud Storage.
16-
17- 1. Retrieve the target bucket.
18- 2. Set destination of data to be uploaded.
19- 3. Upload local CSV.
20- """
8+ def storage_upload_blob (bucketName , source_file_name , destinationBlobName ):
9+ """Upload a CSV to Google Cloud Storage."""
2110 storage_client = storage .Client ()
22- bucket = storage_client .get_bucket (bucket_name )
23- blob = bucket .blob (destination_blob_name )
24- # Commence Upload
11+ bucket = storage_client .get_bucket (bucketName )
12+ blob = bucket .blob (destinationBlobName )
2513 blob .upload_from_filename (source_file_name )
26- print ('File {} uploaded to {}.' .format (
27- source_file_name ,
28- destination_blob_name ))
14+ return 'File {} uploaded to {}.' .format (source_file_name ,
15+ destinationBlobName )
2916
3017
31- def insert_bigquery (target_uri , dataset_id , table_id ):
32- """Insert CSV from Google Storage to BigQuery Table.
33-
34- 1. Specify target dataset within BigQuery.
35- 2. Create a Job configuration.
36- 3. Specify that we are autodetecting datatypes.
37- 4. Reserve row #1 for headers.
38- 5. Specify the source format of the file (defaults to CSV).
39- 6. Pass the URI of the data storage on Google Cloud Storage from.
40- 7. Load BigQuery Job.
41- 8. Execute BigQuery Job.
42- """
18+ def bigquery_insert_data (bucketURI , destinationBlobName , dataset_id , table_id ):
19+ """Insert CSV from Google Storage to BigQuery Table."""
20+ target = bucketURI + destinationBlobName
4321 bigquery_client = bigquery .Client ()
4422 dataset_ref = bigquery_client .dataset (dataset_id )
4523 job_config = bigquery .LoadJobConfig ()
4624 job_config .autodetect = True
4725 job_config .skip_leading_rows = 1
4826 job_config .source_format = bigquery .SourceFormat .CSV
49- uri = target_uri
50- load_job = bigquery_client .load_table_from_uri (
51- uri ,
52- dataset_ref .table (table_id ),
53- job_config = job_config ) # API request
27+ load_job = bigquery_client .load_table_from_uri (target ,
28+ dataset_ref .table (table_id ),
29+ job_config = job_config )
5430 print ('Starting job {}' .format (load_job .job_id ))
55- # Waits for table load to complete.
56- load_job .result ()
57- print ('Job finished.' )
58-
31+ load_job .result () # Waits for table load to complete.
32+ return 'Job finished.'
5933
60- def get_schema (dataset_id , table_id ):
61- """Get BigQuery Table Schema.
6234
63- 1. Specify target dataset within BigQuery.
64- 2. Specify target table within given dataset.
65- 3. Create Table class instance from existing BigQuery Table.
66- 4. Print results to console.
67- 5. Return the schema dict.
68- """
35+ def get_table_schema (dataset_id , table_id ):
36+ """Get BigQuery Table Schema."""
6937 bigquery_client = bigquery .Client ()
7038 dataset_ref = bigquery_client .dataset (dataset_id )
7139 bg_tableref = bigquery .table .TableReference (dataset_ref , table_id )
@@ -76,6 +44,12 @@ def get_schema(dataset_id, table_id):
7644 return bg_table .schema
7745
7846
79- upload_blob (bucket_name , local_dataset , bucket_target )
80- insert_bigquery (bucket_target_uri , bigquery_dataset , bigquery_table )
81- bigquery_table_schema = get_schema (bigquery_dataset , bigquery_table )
47+ storage_upload_blob (bucketName ,
48+ localDataFile ,
49+ destinationBlobName )
50+ bigquery_insert_data (bucketURI ,
51+ destinationBlobName ,
52+ bigqueryDataset ,
53+ bigqueryTable )
54+ bigqueryTableSchema = get_table_schema (bigqueryDataset ,
55+ bigqueryTable )
0 commit comments