-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathexample_DbtTaskGroup_duckdb.py
More file actions
63 lines (49 loc) · 1.46 KB
/
example_DbtTaskGroup_duckdb.py
File metadata and controls
63 lines (49 loc) · 1.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from airflow.sdk import dag, chain, task
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles.duckdb import DuckDBUserPasswordProfileMapping
import os
DUCKDB_CONN_ID = os.getenv("DUCKDB_CONN_ID", "duckdb_default")
DB_PATH = "/usr/local/airflow/include/my_duck.duckdb"
SCHEMA_NAME = os.getenv("DUCKDB_SCHEMA", "main")
DBT_PROJECT_PATH = f"{os.environ['AIRFLOW_HOME']}/include/dbt/jaffle_shop"
DBT_EXECUTABLE_PATH = f"{os.getenv('AIRFLOW_HOME')}/dbt_venv_duckdb/bin/dbt"
_project_config = ProjectConfig(
dbt_project_path=DBT_PROJECT_PATH,
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=DuckDBUserPasswordProfileMapping(
conn_id=DUCKDB_CONN_ID,
profile_args={
"path": DB_PATH,
"schema": SCHEMA_NAME,
},
),
)
_execution_config = ExecutionConfig(
dbt_executable_path=DBT_EXECUTABLE_PATH,
)
_default_args = {
"retries": 2,
}
@dag(
max_active_tasks=1,
tags=["out-of-the-box", "duckdb"],
)
def example_DbtTaskGroup_duckdb():
@task
def pre_dbt():
pass
dbt_project = DbtTaskGroup(
group_id="dbt_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
default_args=_default_args,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt_project, post_dbt())
example_DbtTaskGroup_duckdb()