|
| 1 | +from loguru import logger |
1 | 2 | from biquery_sql_etl.engines import bigquery_engine, rdbms_engine |
2 | 3 | from biquery_sql_etl.queries import sql_queries |
3 | 4 | from biquery_sql_etl.client import DataClient |
4 | | -from loguru import logger |
5 | 5 | from config import bigquery_table |
6 | 6 |
|
7 | 7 |
|
8 | 8 | logger.add('logs/queries.log', format="{time} {message}", level="INFO") |
9 | 9 |
|
10 | 10 |
|
11 | | -def main(): |
12 | | - """Move data between sources.""" |
13 | | - bqc = DataClient(bigquery_engine) |
14 | | - dbc = DataClient(rdbms_engine) |
| 11 | +def init_pipeline(): |
| 12 | + """Move data between Bigquery and MySQL.""" |
| 13 | + total_rows_affected = 0 |
| 14 | + bqc, dbc = data_sources() |
15 | 15 | for table_name, query in sql_queries.items(): |
16 | 16 | fetched_rows = bqc.fetch_rows(query, table=bigquery_table) |
17 | 17 | inserted_rows = dbc.insert_rows(fetched_rows, table_name, replace=True) |
18 | 18 | logger.info(inserted_rows) |
| 19 | + total_rows_affected += len(fetched_rows) |
| 20 | + logger.info(f"Completed migration of {total_rows_affected} rows from BigQuery to MySQL.") |
| 21 | + |
| 22 | + |
| 23 | +def data_sources(): |
| 24 | + """Construct datasources.""" |
| 25 | + bqc = DataClient(bigquery_engine) |
| 26 | + dbc = DataClient(rdbms_engine) |
| 27 | + return bqc, dbc |
0 commit comments