Skip to content

Commit 173a656

Browse files
committed
database flavor agnostic, bidirectional ETL
1 parent de5d80f commit 173a656

11 files changed

Lines changed: 182 additions & 61 deletions

File tree

.gitignore

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,21 @@ venv.bak/
103103
# mypy
104104
.mypy_cache/
105105

106-
106+
# Google Cloud Creds
107107
HackersAndSlackers-d2a47db89384.json
108108
HackersAndSlackers-d2a47db89384.json
109109
HackersAndSlackers-4893023543f3.json
110110
HackersAndSlackers-59ed81beb2ea.json
111111
HackersAndSlackers-aec129ee8154.json
112-
.env
112+
113+
# .DS_Store
113114
.DS_Store
115+
116+
# Pycharm
117+
.idea
118+
.idea/.gitignore
119+
.idea/bigquery-to-mysql.iml
120+
.idea/inspectionProfiles/
121+
.idea/misc.xml
122+
.idea/modules.xml
123+
.idea/vcs.xml

Pipfile

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ verify_ssl = true
66
[dev-packages]
77

88
[packages]
9-
google-cloud-bigquery = "*"
10-
sqlalchemy = "*"
11-
pybigquery = "*"
12-
pymysql = "*"
9+
Google-Cloud-BigQuery = "*"
10+
SQLAlchemy = "*"
11+
PyBigQuery = "*"
12+
PyMySQL = "*"
13+
Psycopg2-Binary = "*"
1314

1415
[requires]
1516
python_version = "3.7"

Pipfile.lock

Lines changed: 44 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
![PyBigQuery](https://img.shields.io/badge/PyBigQuery-v0.4.11-blue.svg?logo=Google&longCache=true&logoColor=white&colorB=5e81ac&style=flat-square&colorA=4c566a)
66
![SQLAlchemy](https://img.shields.io/badge/SQLAlchemy-v1.3.1-red.svg?longCache=true&style=flat-square&logo=scala&logoColor=white&colorA=4c566a&colorB=bf616a)
77
![PyMySQL](https://img.shields.io/badge/PyMySQL-v0.9.3-red.svg?longCache=true&style=flat-square&logo=mysql&logoColor=white&colorA=4c566a&colorB=bf616a)
8+
![Psycopg2-Binary](https://img.shields.io/badge/Psycopg2--Binary-v2.8.4-red.svg?longCache=true&style=flat-square&logo=postgresql&logoColor=white&colorA=4c566a&colorB=bf616a)
89
![GitHub Last Commit](https://img.shields.io/github/last-commit/google/skia.svg?style=flat-square&colorA=4c566a&colorB=a3be8c)
910
[![GitHub Issues](https://img.shields.io/github/issues/toddbirchard/bigquery-to-sql.svg?style=flat-square&colorA=4c566a&colorB=ebcb8b)](https://github.com/hackersandslackers/bigquery-python-tutorial/issues)
1011
[![GitHub Stars](https://img.shields.io/github/stars/toddbirchard/bigquery-to-sql.svg?style=flat-square&colorB=ebcb8b&colorA=4c566a)](https://github.com/hackersandslackers/bigquery-python-tutorial/stargazers)
1112
[![GitHub Forks](https://img.shields.io/github/forks/toddbirchard/bigquery-to-sql.svg?style=flat-square&colorA=4c566a&colorB=ebcb8b)](https://github.com/hackersandslackers/bigquery-python-tutorial/network)
1213

13-
Lightweight ETL script to migrate data from BigQuery to SQL.
14+
Lightweight ETL script to migrate data from BigQuery to SQL database, and vice versa.

config.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,27 @@ class Config:
1111
# Google BigQuery
1212
bigquery_dataset = environ.get('GCP_BIGQUERY_DATASET')
1313
bigquery_table = environ.get('GCP_BIGQUERY_TABLE')
14-
bigquery_uri = f'bigquery://{gcp_project}/{bigquery_dataset}'
15-
bigquery_query = environ.get('GCP_BIGQUERY_QUERY')
14+
bigquery_query = 'queries/analytics.sql'
1615

17-
# MySQL
18-
mysql_username = environ.get('MYSQL_USERNAME')
19-
mysql_password = environ.get('MYSQL_PASSWORD')
20-
mysql_host = environ.get('MYSQL_HOST')
21-
mysql_port = environ.get('MYSQL_PORT')
22-
mysql_db = environ.get('MYSQL_DATABASE_NAME')
23-
mysql_table = environ.get('MYSQL_TABLE_NAME')
24-
mysql_uri = f'mysql+pymysql://{mysql_username}:{mysql_password}@{mysql_host}/{mysql_db}'
16+
# SQL
17+
database_username = environ.get('DATABASE_USERNAME')
18+
database_password = environ.get('DATABASE_PASSWORD')
19+
database_host = environ.get('DATABASE_HOST')
20+
database_port = environ.get('DATABASE_PORT')
21+
database_db = environ.get('DATABASE_DATABASE_NAME')
22+
database_table = environ.get('DATABASE_TABLE_NAME')
23+
24+
@classmethod
25+
def get_bigquery_config(cls):
26+
bigquery_uri = f'bigquery://{cls.gcp_project}/{cls.bigquery_dataset}'
27+
return bigquery_uri, cls.gcp_credentials
28+
29+
@classmethod
30+
def get_database_config(cls, type=None):
31+
database_flavor = None
32+
if type.lower() == 'mysql':
33+
database_flavor = 'mysql+pymysql'
34+
if type.lower() == 'postgres':
35+
database_flavor = 'postgresql+psycopg2'
36+
database_uri = f'{database_flavor}://{cls.database_username}:{cls.database_password}@{cls.database_host}:{cls.database_port}/{cls.database_db}'
37+
return database_uri, cls.database_table

queries/analytics.sql

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
SELECT
2+
REPLACE(REPLACE(REPLACE(title, ' | Hackers and Slackers', ''), ' | Hackers And Slackers', ''), '- Hackers and Slackers', '') as title,
3+
url,
4+
REPLACE(REPLACE(url, 'https://hackersandslackers.com/', ''), '/' , '') as slug,
5+
COUNT(title) AS views
6+
FROM
7+
hackersgatsbyprod.pages
8+
WHERE
9+
timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 day)
10+
GROUP BY
11+
title,
12+
url
13+
ORDER BY
14+
COUNT(title) DESC
15+
LIMIT
16+
100;

requirements.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ google-resumable-media==0.4.1
1010
googleapis-common-protos==1.6.0
1111
idna==2.8
1212
protobuf==3.10.0
13+
psycopg2-binary==2.8.4
1314
pyasn1==0.4.7
1415
pyasn1-modules==0.2.7
1516
pybigquery==0.4.11
@@ -18,5 +19,5 @@ pytz==2019.3
1819
requests==2.22.0
1920
rsa==4.0
2021
six==1.13.0
21-
SQLAlchemy==1.3.10
22-
urllib3==1.25.6
22+
SQLAlchemy==1.3.11
23+
urllib3==1.25.7

src/__init__.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,27 @@
1-
import logging
2-
from .bigquery import BigQueryClient
3-
from .database import MySQLClient
41
from config import Config
5-
6-
logging.basicConfig(format='%(asctime)s \n%(message)s',
7-
level=logging.INFO)
2+
from src.bigquery import BigQueryClient
3+
from src.database import SQLClient
4+
from src.util.read import get_query_from_file
85

96

107
def main():
11-
bigquery = BigQueryClient(Config)
12-
mysql = MySQLClient(Config)
13-
bq_rows = bigquery.fetch_bigquery_rows(Config.bigquery_query)
14-
updated = mysql.insert_rows(bq_rows)
15-
logging.info(f'Top posts this week: {updated}')
8+
bigquery = init_bigquery()
9+
database = init_database()
10+
query = get_query_from_file(Config.bigquery_query)
11+
rows = bigquery.fetch_rows(query)
12+
updated = database.insert_rows(rows, replace=True)
13+
print(updated)
14+
15+
16+
def init_bigquery():
17+
"""Initiate BigQuery Client."""
18+
uri, creds = Config.get_bigquery_config()
19+
bigquery = BigQueryClient(uri=uri, creds=creds)
20+
return bigquery
21+
22+
23+
def init_database():
24+
"""Initiate SQL database Client."""
25+
uri, table = Config.get_database_config(type='mysql')
26+
bigquery = SQLClient(uri=uri, table=table)
27+
return bigquery

src/bigquery.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,30 @@
1-
"""BigQuery data source."""
1+
"""BigQuery Client."""
2+
import logging
23
from sqlalchemy.engine import create_engine
3-
from sqlalchemy import MetaData
4+
5+
6+
logging.basicConfig(filename='logs/bigquery.log',
7+
format='%(asctime)s %(message)s')
8+
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
49

510

611
class BigQueryClient:
712

8-
def __init__(self, Config):
9-
self.metadata = MetaData()
10-
self.uri = Config.bigquery_uri
11-
self.credentials = Config.gcp_credentials
13+
def __init__(self, uri=None, table=None, creds=None):
14+
self.uri = uri
15+
self.table_name = table
16+
self.creds = creds
1217
self.engine = create_engine(self.uri,
13-
credentials_path=self.credentials)
18+
credentials_path=self.creds)
19+
20+
def insert_rows(self, rows, replace=None):
21+
"""Insert rows into SQL table."""
22+
if replace:
23+
self.engine.execute(f'TRUNCATE TABLE {self.table_name}')
24+
self.engine.execute(self.table.insert(), rows)
25+
return [row.items()[0][1] for row in rows]
1426

15-
def fetch_bigquery_rows(self, query):
27+
def fetch_rows(self, query):
1628
"""Fetch rows from BigQuery via query."""
1729
results = self.engine.execute(query).fetchall()
1830
return results

src/database.py

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,32 @@
1-
"""MySQL data destination."""
1+
"""SQL Database Client."""
2+
import logging
23
from sqlalchemy.engine import create_engine
3-
from sqlalchemy import MetaData, Table, Column, Integer, String
4+
from sqlalchemy import MetaData, Table
45

6+
logging.basicConfig(filename='logs/sql.log',
7+
format='%(asctime)s %(message)s')
8+
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
59

6-
class MySQLClient:
710

8-
def __init__(self, Config):
9-
self.mysql_uri = Config.mysql_uri
10-
self.engine = create_engine(self.mysql_uri)
11-
self.metadata = MetaData()
12-
self.table_name = Config.mysql_table
13-
self.table = Table(Config.mysql_table, self.metadata,
14-
Column('id', Integer),
15-
Column('title', String),
16-
Column('url', String),
17-
Column('slug', String),
18-
Column('views', Integer))
11+
class SQLClient:
1912

20-
def insert_rows(self, rows):
21-
"""Insert rows into MySQL table."""
22-
self.engine.execute(f'TRUNCATE TABLE {self.table_name}')
13+
def __init__(self, uri=None, table=None):
14+
self.uri = uri
15+
self.table_name = table
16+
self.engine = create_engine(self.uri)
17+
self.metadata = MetaData(bind=self.engine)
18+
self.table = Table(table,
19+
self.metadata,
20+
autoload=True)
21+
22+
def insert_rows(self, rows, replace=None):
23+
"""Insert rows into SQL table."""
24+
if replace:
25+
self.engine.execute(f'TRUNCATE TABLE {self.table_name}')
2326
self.engine.execute(self.table.insert(), rows)
2427
return [row.items()[0][1] for row in rows]
28+
29+
def fetch_rows(self, query):
30+
"""Fetch rows from SQL table via query."""
31+
results = self.engine.execute(query).fetchall()
32+
return results

0 commit comments

Comments
 (0)