SQLExecuteQueryOperator for Snowflake¶
Use the SQLExecuteQueryOperator to execute
SQL commands in a Snowflake database.
Using the Operator¶
Use the conn_id argument to connect to your Snowflake instance where
the connection metadata is structured as follows:
Parameter |
Input |
|---|---|
Login: string |
Snowflake user name |
Password: string |
Password for Snowflake user |
Schema: string |
Set schema to execute SQL operations on by default |
Extra: dictionary |
|
An example usage of the SQLExecuteQueryOperator to connect to Snowflake is as follows:
snowflake_op_sql_str = SQLExecuteQueryOperator(
task_id="snowflake_op_sql_str", sql=CREATE_TABLE_SQL_STRING
)
snowflake_op_with_params = SQLExecuteQueryOperator(
task_id="snowflake_op_with_params",
sql=SQL_INSERT_STATEMENT,
parameters={"id": 56},
)
snowflake_op_sql_list = SQLExecuteQueryOperator(task_id="snowflake_op_sql_list", sql=SQL_LIST)
snowflake_op_sql_multiple_stmts = SQLExecuteQueryOperator(
task_id="snowflake_op_sql_multiple_stmts",
sql=SQL_MULTIPLE_STMTS,
split_statements=True,
)
snowflake_op_template_file = SQLExecuteQueryOperator(
task_id="snowflake_op_template_file",
sql="example_snowflake_snowflake_op_template_file.sql",
)
# Create and populate a small dataset for the data quality operator examples.
# We insert one row for `ds` and one for `ds - 1`, each with value = 4.
create_check_table = SQLExecuteQueryOperator(
task_id="create_check_table",
sql=CREATE_CHECK_TABLE_SQL_STRING,
)
populate_check_table = SQLExecuteQueryOperator(
task_id="populate_check_table",
sql=SQL_CHECK_TABLE_INSERT,
)
Note
Parameters that can be passed onto the operator will be given priority over the parameters already given
in the Airflow connection metadata (such as schema, role, database and so forth).
SnowflakeCheckOperator¶
To perform checks against Snowflake you can use
SnowflakeCheckOperator
This operator expects a SQL query that will return a single row. Each value on
that first row is evaluated using Python bool casting. If any of the values
return False the check fails and errors out.
snowflake_check = SnowflakeCheckOperator(
task_id="snowflake_check",
sql=f"SELECT COUNT(*) FROM {SNOWFLAKE_CHECK_TABLE} WHERE ds = TO_DATE('{{{{ ds }}}}')",
)
SnowflakeValueCheckOperator¶
To perform a simple value check using SQL code you can use
SnowflakeValueCheckOperator
This operator expects a SQL query that will return a single row. That value is
evaluated against pass_value, which can be either a string or numeric value.
If numeric, you can also specify tolerance.
snowflake_value_check = SnowflakeValueCheckOperator(
task_id="snowflake_value_check",
sql=f"SELECT SUM(value) FROM {SNOWFLAKE_CHECK_TABLE} WHERE ds = TO_DATE('{{{{ ds }}}}')",
pass_value=4,
)
SnowflakeIntervalCheckOperator¶
To check that the values of metrics given as SQL expressions are within a certain
tolerance of the ones from days_back before you can use
SnowflakeIntervalCheckOperator
snowflake_interval_check = SnowflakeIntervalCheckOperator(
task_id="snowflake_interval_check",
table=SNOWFLAKE_CHECK_TABLE,
metrics_thresholds={"COUNT(*)": 1.5},
days_back=1,
)
SnowflakeSqlApiOperator¶
Use the SnowflakeSqlApiHook to execute
SQL commands in a Snowflake database.
You can also run this operator in deferrable mode by setting deferrable param to True.
This will ensure that the task is deferred from the Airflow worker slot and polling for the task status happens on the trigger.
Using the Operator¶
Use the snowflake_conn_id argument to connect to your Snowflake instance where
the connection metadata is structured as follows:
Parameter |
Input |
|---|---|
Login: string |
Snowflake user name. If using OAuth connection this is the |
Password: string |
Password for Snowflake user. If using OAuth this is the |
Schema: string |
Set schema to execute SQL operations on by default |
Extra: dictionary |
|
An example usage of the SnowflakeSqlApiHook is as follows:
snowflake_sql_api_op_sql_multiple_stmt = SnowflakeSqlApiOperator(
task_id="snowflake_op_sql_multiple_stmt",
sql=SQL_MULTIPLE_STMTS,
statement_count=len(SQL_LIST),
)
Note
Parameters that can be passed onto the operator will be given priority over the parameters already given
in the Airflow connection metadata (such as schema, role, database and so forth).