Back to top

Trigger a DBT Cloud job run from Snowflake

  • Create a Snowflake secret to keep DBT Cloud API token: 
USE ROLE ACCOUNTADMIN;
USE DATABASE CONFIG;
USE SCHEMA CONFIG;
 
CREATE OR REPLACE SECRET secret_dbt_cloud_api_project_job
  TYPE = GENERIC_STRING
  SECRET_STRING = '---dbt-cloud-api-token---'
  COMMENT = 'DBT Cloud API Token to trigger a project jobs';
 
GRANT READ ON SECRET CONFIG.CONFIG.secret_dbt_cloud_api_project_job TO ROLE MY_ROLE;
GRANT USAGE ON DATABASE CONFIG TO ROLE MY_ROLE;
GRANT USAGE ON SCHEMA CONFIG.CONFIG TO ROLE MY_ROLE;
  •  Create a network rule to access DBT Cloud API:
USE ROLE ACCOUNTADMIN;
USE DATABASE CONFIG;
USE SCHEMA CONFIG;
 
CREATE OR REPLACE NETWORK RULE NETWORK_RULE_HOST_PORT_DBT_CLOUD
   TYPE = HOST_PORT
   VALUE_LIST = ( 'your.instance.dbt.com:443' )
   MODE = EGRESS
   COMMENT = 'DBT Cloud';
  •  Create an External Access Integration:
USE ROLE ACCOUNTADMIN;
USE DATABASE CONFIG;
USE SCHEMA CONFIG;
 
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION EXTERNAL_ACCESS_DBT_CLOUD_API
  ALLOWED_NETWORK_RULES = ( NETWORK_RULE_HOST_PORT_DBT_CLOUD )
  ALLOWED_AUTHENTICATION_SECRETS = ( secret_dbt_cloud_api_project_job )
  ENABLED = TRUE
  COMMENT = 'Access to DBT Cloud API';
 
GRANT USAGE ON INTEGRATION EXTERNAL_ACCESS_DBT_CLOUD_API TO ROLE MY_ROLE;
  •  Create a Python UDF to access API
USE DATABASE MY_DATABASE;
USE SCHEMA MY_SCHEMA;
 
CREATE OR REPLACE FUNCTION run_dbt_project_job(job_id STRING, cause STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
HANDLER = 'run_dbt_project_job'
EXTERNAL_ACCESS_INTEGRATIONS = ( EXTERNAL_ACCESS_DBT_CLOUD_API )
SECRETS = ( 'dbt_api_token' = config.config.secret_dbt_cloud_api_project_job )
PACKAGES = ('requests')
AS
$$
 
import _snowflake
import requests
 
def run_dbt_project_job(job_id, cause):
    api_key = _snowflake.get_generic_secret_string('dbt_api_token')
    account_id = '---your-dbt-cloud-account-id---'
    url = f"https://your.instance.dbt.com/api/v2/accounts/{account_id}/jobs/{job_id}/run/"
     
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
     
    payload = {
        "cause": cause
    }
     
    try:
        # Send the POST request to dbt Cloud API
        response = requests.post(url, headers=headers, json=payload)
        response.raise_for_status()  # Raise an error for HTTP codes 4xx/5xx
        response_json = response.json()
        return response_json.get("status", {}).get("user_message", None)
        # return response.json()  # Return the response from dbt Cloud API
    except requests.exceptions.RequestException as e:
        return f"Error triggering dbt Cloud job: {e}"
$$;
 
GRANT USAGE ON FUNCTION MY_DATABASE.MY_SCHEMA.run_dbt_project_job(STRING,STRING) TO ROLE MY_ROLE;
  •  Use function to trigger a job run at DBT Cloud:
USE DATABASE MY_DATABASE;
USE SCHEMA MY_SCHEMA;
 
SELECT run_dbt_project_job('---your-project-job-id---', 'comment ...');

returns Success! for successful completion, an error message otherwise.

Release news

2025-01-06 cwDup 4.1.4
2024-12-31 icurl 1.1.0
2024-12-28 Win2ban 2.0.5