-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathexample_DbtTaskGroup_bigquery.py
More file actions
76 lines (60 loc) · 2.1 KB
/
example_DbtTaskGroup_bigquery.py
File metadata and controls
76 lines (60 loc) · 2.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
"""
This dag runs the jaffle_shop dbt project on bigquery using
the `DbtTaskGroup` class from Cosmos.
"""
from airflow.sdk import dag, chain, task
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles.bigquery import GoogleCloudServiceAccountFileProfileMapping
import os
# You need to set this Airflow connection, for an example see the .env_example file in the root of this repository
# Values can be overriden using the `profile_args` parameter in the `GoogleCloudServiceAccountFileProfileMapping` class
BIGQUERY_CONN_ID = os.getenv("BIGQUERY_CONN_ID", "bigquery_default")
PROJECT_ID = os.getenv("GCP_PROJECT_ID", "my-project")
DATASET_NAME = os.getenv("BIGQUERY_DATASET", "dev_dataset")
BIGQUERY_KEYFILE_PATH = os.getenv(
"BIGQUERY_KEYFILE_PATH", "/usr/local/airflow/include/secrets/bigquery-key.json"
)
DBT_PROJECT_PATH = f"{os.environ['AIRFLOW_HOME']}/include/dbt/jaffle_shop"
DBT_EXECUTABLE_PATH = f"{os.getenv('AIRFLOW_HOME')}/dbt_venv_bigquery/bin/dbt"
# Only needed if you can't install dbt-bigquery in the requirements.txt file
_project_config = ProjectConfig(
dbt_project_path=DBT_PROJECT_PATH,
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=GoogleCloudServiceAccountFileProfileMapping(
conn_id=BIGQUERY_CONN_ID,
profile_args={
"project": PROJECT_ID,
"dataset": DATASET_NAME,
"keyfile": BIGQUERY_KEYFILE_PATH,
},
),
)
_execution_config = ExecutionConfig(
dbt_executable_path=DBT_EXECUTABLE_PATH,
)
_default_args = {
"retries": 2,
}
@dag(
max_active_tasks=1,
tags=["basic", "bigquery", "jaffle_shop"],
)
def example_DbtTaskGroup_bigquery():
@task
def pre_dbt():
pass
dbt_project = DbtTaskGroup(
group_id="dbt_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
default_args=_default_args,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt_project, post_dbt())
example_DbtTaskGroup_bigquery()