-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathexample_DbtTaskGroup_databricks.py
More file actions
68 lines (54 loc) · 1.7 KB
/
example_DbtTaskGroup_databricks.py
File metadata and controls
68 lines (54 loc) · 1.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from airflow.sdk import dag, chain, task
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles.databricks import DatabricksTokenProfileMapping
import os
DATABRICKS_CONN_ID = os.getenv("DATABRICKS_CONN_ID", "databricks_default")
CATALOG_NAME = os.getenv("DATABRICKS_CATALOG", "dev_catalog")
SCHEMA_NAME = os.getenv("DATABRICKS_SCHEMA", "dev_schema")
COMPUTE_NAME = os.getenv("DATABRICKS_COMPUTE", "shared_compute")
DBT_PROJECT_PATH = f"{os.environ['AIRFLOW_HOME']}/include/dbt/jaffle_shop"
DBT_EXECUTABLE_PATH = f"{os.getenv('AIRFLOW_HOME')}/dbt_venv_databricks/bin/dbt"
_project_config = ProjectConfig(
dbt_project_path=DBT_PROJECT_PATH,
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=DatabricksTokenProfileMapping(
conn_id=DATABRICKS_CONN_ID,
profile_args={
"catalog": CATALOG_NAME,
"schema": SCHEMA_NAME,
"compute": {
"type": "serverless",
"compute_name": COMPUTE_NAME,
},
},
),
)
_execution_config = ExecutionConfig(
dbt_executable_path=DBT_EXECUTABLE_PATH,
)
_default_args = {
"retries": 2,
}
@dag(
max_active_tasks=1,
tags=["part_1", "databricks"],
)
def example_DbtTaskGroup_databricks():
@task
def pre_dbt():
pass
dbt_project = DbtTaskGroup(
group_id="dbt_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
default_args=_default_args,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt_project, post_dbt())
example_DbtTaskGroup_databricks()