-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathexample_DbtTaskGroup_postgres.py
More file actions
66 lines (51 loc) · 1.71 KB
/
example_DbtTaskGroup_postgres.py
File metadata and controls
66 lines (51 loc) · 1.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""
This dag runs the jaffle_shop dbt project on postgres using
the `DbtTaskGroup` class from Cosmos.
"""
from airflow.sdk import dag, chain, task
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping
import os
# You need to set this Airflow connection, for an example see the .env_example file in the root of this repository
POSTGRES_CONN_ID = os.getenv("POSTGRES_CONN_ID", "postgres_default")
SCHEMA_NAME = os.getenv("POSTGRES_SCHEMA", "DEMO_SCHEMA")
DBT_PROJECT_PATH = f"{os.environ['AIRFLOW_HOME']}/include/dbt/jaffle_shop"
DBT_EXECUTABLE_PATH = f"{os.getenv('AIRFLOW_HOME')}/dbt_venv_postgres/bin/dbt"
# Only needed if you can't install dbt-postgres in the requirements.txt file
_project_config = ProjectConfig(
dbt_project_path=DBT_PROJECT_PATH,
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id=POSTGRES_CONN_ID,
profile_args={"schema": SCHEMA_NAME},
),
)
_execution_config = ExecutionConfig(
dbt_executable_path=DBT_EXECUTABLE_PATH,
)
_default_args = {
"retries": 2,
}
@dag(
max_active_tasks=1,
tags=["basic", "postgres", "jaffle_shop", "out-of-the-box"],
)
def example_DbtTaskGroup_postgres():
@task
def pre_dbt():
pass
dbt_project = DbtTaskGroup(
group_id="dbt_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
default_args=_default_args,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt_project, post_dbt())
example_DbtTaskGroup_postgres()