SQLExecuteQueryOperator for Snowflake

Use the SQLExecuteQueryOperator to execute SQL commands in a Snowflake database.

Using the Operator

Use the conn_id argument to connect to your Snowflake instance where the connection metadata is structured as follows:

Snowflake Airflow Connection Metadata

Parameter

Input

Login: string

Snowflake user name

Password: string

Password for Snowflake user

Schema: string

Set schema to execute SQL operations on by default

Extra: dictionary

warehouse, account, database, region, role, authenticator

An example usage of the SQLExecuteQueryOperator to connect to Snowflake is as follows:

tests/system/snowflake/example_snowflake.py[source]

snowflake_op_sql_str = SQLExecuteQueryOperator(
    task_id="snowflake_op_sql_str", sql=CREATE_TABLE_SQL_STRING
)

snowflake_op_with_params = SQLExecuteQueryOperator(
    task_id="snowflake_op_with_params",
    sql=SQL_INSERT_STATEMENT,
    parameters={"id": 56},
)

snowflake_op_sql_list = SQLExecuteQueryOperator(task_id="snowflake_op_sql_list", sql=SQL_LIST)

snowflake_op_sql_multiple_stmts = SQLExecuteQueryOperator(
    task_id="snowflake_op_sql_multiple_stmts",
    sql=SQL_MULTIPLE_STMTS,
    split_statements=True,
)

snowflake_op_template_file = SQLExecuteQueryOperator(
    task_id="snowflake_op_template_file",
    sql="example_snowflake_snowflake_op_template_file.sql",
)

# Create and populate a small dataset for the data quality operator examples.
# We insert one row for `ds` and one for `ds - 1`, each with value = 4.
create_check_table = SQLExecuteQueryOperator(
    task_id="create_check_table",
    sql=CREATE_CHECK_TABLE_SQL_STRING,
)

populate_check_table = SQLExecuteQueryOperator(
    task_id="populate_check_table",
    sql=SQL_CHECK_TABLE_INSERT,
)

Note

Parameters that can be passed onto the operator will be given priority over the parameters already given in the Airflow connection metadata (such as schema, role, database and so forth).

SnowflakeCheckOperator

To perform checks against Snowflake you can use SnowflakeCheckOperator

This operator expects a SQL query that will return a single row. Each value on that first row is evaluated using Python bool casting. If any of the values return False the check fails and errors out.

tests/system/snowflake/example_snowflake.py[source]

snowflake_check = SnowflakeCheckOperator(
    task_id="snowflake_check",
    sql=f"SELECT COUNT(*) FROM {SNOWFLAKE_CHECK_TABLE} WHERE ds = TO_DATE('{{{{ ds }}}}')",
)

SnowflakeValueCheckOperator

To perform a simple value check using SQL code you can use SnowflakeValueCheckOperator

This operator expects a SQL query that will return a single row. That value is evaluated against pass_value, which can be either a string or numeric value. If numeric, you can also specify tolerance.

tests/system/snowflake/example_snowflake.py[source]

snowflake_value_check = SnowflakeValueCheckOperator(
    task_id="snowflake_value_check",
    sql=f"SELECT SUM(value) FROM {SNOWFLAKE_CHECK_TABLE} WHERE ds = TO_DATE('{{{{ ds }}}}')",
    pass_value=4,
)

SnowflakeIntervalCheckOperator

To check that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before you can use SnowflakeIntervalCheckOperator

tests/system/snowflake/example_snowflake.py[source]

snowflake_interval_check = SnowflakeIntervalCheckOperator(
    task_id="snowflake_interval_check",
    table=SNOWFLAKE_CHECK_TABLE,
    metrics_thresholds={"COUNT(*)": 1.5},
    days_back=1,
)

SnowflakeSqlApiOperator

Use the SnowflakeSqlApiHook to execute SQL commands in a Snowflake database.

You can also run this operator in deferrable mode by setting deferrable param to True. This will ensure that the task is deferred from the Airflow worker slot and polling for the task status happens on the trigger.

Using the Operator

Use the snowflake_conn_id argument to connect to your Snowflake instance where the connection metadata is structured as follows:

Snowflake Airflow Connection Metadata

Parameter

Input

Login: string

Snowflake user name. If using OAuth connection this is the client_id

Password: string

Password for Snowflake user. If using OAuth this is the client_secret

Schema: string

Set schema to execute SQL operations on by default

Extra: dictionary

warehouse, account, database, region, role, authenticator, refresh_token. If using OAuth must specify refresh_token (obtained here)

An example usage of the SnowflakeSqlApiHook is as follows:

tests/system/snowflake/example_snowflake.py[source]

snowflake_sql_api_op_sql_multiple_stmt = SnowflakeSqlApiOperator(
    task_id="snowflake_op_sql_multiple_stmt",
    sql=SQL_MULTIPLE_STMTS,
    statement_count=len(SQL_LIST),
)

Note

Parameters that can be passed onto the operator will be given priority over the parameters already given in the Airflow connection metadata (such as schema, role, database and so forth).

Was this entry helpful?