Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

DatabricksSQLStatementsOperator

Use the DatabricksSQLStatementsOperator to submit a Databricks SQL Statement to Databricks using the Databricks SQL Statement Execution API.

Using the Operator

The DatabricksSQLStatementsOperator submits SQL statements to Databricks using the /api/2.0/sql/statements/ endpoint. It supports configurable execution parameters such as warehouse selection, catalog, schema, and parameterized queries. The operator can either synchronously poll for query completion or run in a deferrable mode for improved efficiency.

The only required parameters for using the operator are:

  • statement - The SQL statement to execute. The statement can optionally be parameterized, see parameters.

  • warehouse_id - Warehouse upon which to execute a statement.

All other parameters are optional and described in the documentation for DatabricksSQLStatementsOperator including but not limited to:

  • catalog

  • schema

  • parameters

Examples

An example usage of the DatabricksSQLStatementsOperator is as follows:

tests/system/databricks/example_databricks.py[source]

    sql_statement = DatabricksSQLStatementsOperator(
        task_id="sql_statement",
        databricks_conn_id="databricks_default",
        statement="select * from default.my_airflow_table",
        warehouse_id=WAREHOUSE_ID,
        # deferrable=True, # For using the operator in deferrable mode
    )

DatabricksSQLStatementsSensor

Use the DatabricksSQLStatementsSensor to either submit a Databricks SQL Statement to Databricks using the Databricks SQL Statement Execution API, or pass a Statement ID to the Sensor and await for the query to terminate execution.

Using the Sensor

The DatabricksSQLStatementsSensor does one of two things. The Sensor can submit SQL statements to Databricks using the /api/2.0/sql/statements/ endpoint. However, the Sensor can also take the Statement ID of an already-submitted SQL Statement and handle the response to that execution.

It supports configurable execution parameters such as warehouse selection, catalog, schema, and parameterized queries. The operator can either synchronously poll for query completion or run in a deferrable mode for improved efficiency.

The only required parameters for using the Sensor are:

  • One of statement or statement_id - The SQL statement to execute. The statement can optionally be parameterized, see parameters.

  • warehouse_id - Warehouse upon which to execute a statement.

All other parameters are optional and described in the documentation for DatabricksSQLStatementsSensor including but not limited to:

  • catalog

  • schema

  • parameters

Examples

An example usage of the DatabricksSQLStatementsSensor is as follows:

tests/system/databricks/example_databricks_sensors.py[source]

    # Example of using the DatabricksSQLStatementSensor to wait for a query
    # to successfully run.
    sql_statement_sensor = DatabricksSQLStatementsSensor(
        task_id="sql_statement_sensor_task",
        databricks_conn_id=connection_id,
        warehouse_id="warehouse_id",
        statement="select * from default.my_airflow_table",
        # deferrable=True, # For using the operator in deferrable mode
    )

Was this entry helpful?