Submitted by itefix on
- Acquire a DBT Cloud Service token with privileges enough to run a DBT job within a project (https://docs.getdbt.com/docs/dbt-cloud-apis/service-tokens, API def https://docs.getdbt.com/dbt-cloud/api-v2#/operations/Trigger%20Job%20Run)
- Create a Snowflake secret to keep DBT Cloud API token:
USE ROLE ACCOUNTADMIN; USE DATABASE CONFIG; USE SCHEMA CONFIG; CREATE OR REPLACE SECRET secret_dbt_cloud_api_project_job TYPE = GENERIC_STRING SECRET_STRING = '---dbt-cloud-api-token---' COMMENT = 'DBT Cloud API Token to trigger a project jobs'; GRANT READ ON SECRET CONFIG.CONFIG.secret_dbt_cloud_api_project_job TO ROLE MY_ROLE; GRANT USAGE ON DATABASE CONFIG TO ROLE MY_ROLE; GRANT USAGE ON SCHEMA CONFIG.CONFIG TO ROLE MY_ROLE;
- Create a network rule to access DBT Cloud API:
USE ROLE ACCOUNTADMIN; USE DATABASE CONFIG; USE SCHEMA CONFIG; CREATE OR REPLACE NETWORK RULE NETWORK_RULE_HOST_PORT_DBT_CLOUD TYPE = HOST_PORT VALUE_LIST = ( 'your.instance.dbt.com:443' ) MODE = EGRESS COMMENT = 'DBT Cloud';
- Create an External Access Integration:
USE ROLE ACCOUNTADMIN; USE DATABASE CONFIG; USE SCHEMA CONFIG; CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION EXTERNAL_ACCESS_DBT_CLOUD_API ALLOWED_NETWORK_RULES = ( NETWORK_RULE_HOST_PORT_DBT_CLOUD ) ALLOWED_AUTHENTICATION_SECRETS = ( secret_dbt_cloud_api_project_job ) ENABLED = TRUE COMMENT = 'Access to DBT Cloud API'; GRANT USAGE ON INTEGRATION EXTERNAL_ACCESS_DBT_CLOUD_API TO ROLE MY_ROLE;
- Create a Python UDF to access API
USE DATABASE MY_DATABASE; USE SCHEMA MY_SCHEMA; CREATE OR REPLACE FUNCTION run_dbt_project_job(job_id STRING, cause STRING) RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = '3.11' HANDLER = 'run_dbt_project_job' EXTERNAL_ACCESS_INTEGRATIONS = ( EXTERNAL_ACCESS_DBT_CLOUD_API ) SECRETS = ( 'dbt_api_token' = config.config.secret_dbt_cloud_api_project_job ) PACKAGES = ('requests') AS $$ import _snowflake import requests def run_dbt_project_job(job_id, cause): api_key = _snowflake.get_generic_secret_string('dbt_api_token') account_id = '---your-dbt-cloud-account-id---' url = f"https://your.instance.dbt.com/api/v2/accounts/{account_id}/jobs/{job_id}/run/" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "cause": cause } try: # Send the POST request to dbt Cloud API response = requests.post(url, headers=headers, json=payload) response.raise_for_status() # Raise an error for HTTP codes 4xx/5xx response_json = response.json() return response_json.get("status", {}).get("user_message", None) # return response.json() # Return the response from dbt Cloud API except requests.exceptions.RequestException as e: return f"Error triggering dbt Cloud job: {e}" $$; GRANT USAGE ON FUNCTION MY_DATABASE.MY_SCHEMA.run_dbt_project_job(STRING,STRING) TO ROLE MY_ROLE;
- Use function to trigger a job run at DBT Cloud:
USE DATABASE MY_DATABASE; USE SCHEMA MY_SCHEMA; SELECT run_dbt_project_job('---your-project-job-id---', 'comment ...');
returns Success! for successful completion, an error message otherwise.