Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

airflow.providers.databricks.hooks.databricks

Databricks hook.

This hook enable the submitting and running of jobs to the Databricks platform. Internally the operators talk to the api/2.1/jobs/run-now endpoint <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow>_ or the api/2.1/jobs/runs/submit endpoint.

Attributes

GET_CLUSTER_ENDPOINT

RESTART_CLUSTER_ENDPOINT

START_CLUSTER_ENDPOINT

TERMINATE_CLUSTER_ENDPOINT

CREATE_ENDPOINT

RESET_ENDPOINT

UPDATE_ENDPOINT

RUN_NOW_ENDPOINT

SUBMIT_RUN_ENDPOINT

GET_RUN_ENDPOINT

CANCEL_RUN_ENDPOINT

DELETE_RUN_ENDPOINT

REPAIR_RUN_ENDPOINT

OUTPUT_RUNS_JOB_ENDPOINT

CANCEL_ALL_RUNS_ENDPOINT

INSTALL_LIBS_ENDPOINT

UNINSTALL_LIBS_ENDPOINT

LIST_JOBS_ENDPOINT

LIST_PIPELINES_ENDPOINT

WORKSPACE_GET_STATUS_ENDPOINT

SPARK_VERSIONS_ENDPOINT

SQL_STATEMENTS_ENDPOINT

Classes

RunLifeCycleState

Enum for the run life cycle state concept of Databricks runs.

RunState

Utility class for the run state concept of Databricks runs.

ClusterState

Utility class for the cluster state concept of Databricks cluster.

SQLStatementState

Utility class for the SQL statement state concept of Databricks statements.

DatabricksHook

Interact with Databricks.

Module Contents

airflow.providers.databricks.hooks.databricks.GET_CLUSTER_ENDPOINT = ('GET', 'api/2.0/clusters/get')[source]
airflow.providers.databricks.hooks.databricks.RESTART_CLUSTER_ENDPOINT = ('POST', 'api/2.0/clusters/restart')[source]
airflow.providers.databricks.hooks.databricks.START_CLUSTER_ENDPOINT = ('POST', 'api/2.0/clusters/start')[source]
airflow.providers.databricks.hooks.databricks.TERMINATE_CLUSTER_ENDPOINT = ('POST', 'api/2.0/clusters/delete')[source]
airflow.providers.databricks.hooks.databricks.CREATE_ENDPOINT = ('POST', 'api/2.1/jobs/create')[source]
airflow.providers.databricks.hooks.databricks.RESET_ENDPOINT = ('POST', 'api/2.1/jobs/reset')[source]
airflow.providers.databricks.hooks.databricks.UPDATE_ENDPOINT = ('POST', 'api/2.1/jobs/update')[source]
airflow.providers.databricks.hooks.databricks.RUN_NOW_ENDPOINT = ('POST', 'api/2.1/jobs/run-now')[source]
airflow.providers.databricks.hooks.databricks.SUBMIT_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/submit')[source]
airflow.providers.databricks.hooks.databricks.GET_RUN_ENDPOINT = ('GET', 'api/2.1/jobs/runs/get')[source]
airflow.providers.databricks.hooks.databricks.CANCEL_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/cancel')[source]
airflow.providers.databricks.hooks.databricks.DELETE_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/delete')[source]
airflow.providers.databricks.hooks.databricks.REPAIR_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/repair')[source]
airflow.providers.databricks.hooks.databricks.OUTPUT_RUNS_JOB_ENDPOINT = ('GET', 'api/2.1/jobs/runs/get-output')[source]
airflow.providers.databricks.hooks.databricks.CANCEL_ALL_RUNS_ENDPOINT = ('POST', 'api/2.1/jobs/runs/cancel-all')[source]
airflow.providers.databricks.hooks.databricks.INSTALL_LIBS_ENDPOINT = ('POST', 'api/2.0/libraries/install')[source]
airflow.providers.databricks.hooks.databricks.UNINSTALL_LIBS_ENDPOINT = ('POST', 'api/2.0/libraries/uninstall')[source]
airflow.providers.databricks.hooks.databricks.LIST_JOBS_ENDPOINT = ('GET', 'api/2.1/jobs/list')[source]
airflow.providers.databricks.hooks.databricks.LIST_PIPELINES_ENDPOINT = ('GET', 'api/2.0/pipelines')[source]
airflow.providers.databricks.hooks.databricks.WORKSPACE_GET_STATUS_ENDPOINT = ('GET', 'api/2.0/workspace/get-status')[source]
airflow.providers.databricks.hooks.databricks.SPARK_VERSIONS_ENDPOINT = ('GET', 'api/2.0/clusters/spark-versions')[source]
airflow.providers.databricks.hooks.databricks.SQL_STATEMENTS_ENDPOINT = 'api/2.0/sql/statements'[source]
class airflow.providers.databricks.hooks.databricks.RunLifeCycleState[source]

Bases: enum.Enum

Enum for the run life cycle state concept of Databricks runs.

See more information at: https://docs.databricks.com/api/azure/workspace/jobs/listruns#runs-state-life_cycle_state

BLOCKED = 'BLOCKED'[source]
INTERNAL_ERROR = 'INTERNAL_ERROR'[source]
PENDING = 'PENDING'[source]
QUEUED = 'QUEUED'[source]
RUNNING = 'RUNNING'[source]
SKIPPED = 'SKIPPED'[source]
TERMINATED = 'TERMINATED'[source]
TERMINATING = 'TERMINATING'[source]
WAITING_FOR_RETRY = 'WAITING_FOR_RETRY'[source]
class airflow.providers.databricks.hooks.databricks.RunState(life_cycle_state, result_state='', state_message='', *args, **kwargs)[source]

Utility class for the run state concept of Databricks runs.

RUN_LIFE_CYCLE_STATES = ['PENDING', 'RUNNING', 'TERMINATING', 'TERMINATED', 'SKIPPED', 'INTERNAL_ERROR', 'QUEUED'][source]
life_cycle_state[source]
result_state = ''[source]
state_message = ''[source]
property is_terminal: bool[source]

True if the current state is a terminal state.

property is_successful: bool[source]

True if the result state is SUCCESS.

__eq__(other)[source]
__repr__()[source]
to_json()[source]
classmethod from_json(data)[source]
class airflow.providers.databricks.hooks.databricks.ClusterState(state='', state_message='', *args, **kwargs)[source]

Utility class for the cluster state concept of Databricks cluster.

CLUSTER_LIFE_CYCLE_STATES = ['PENDING', 'RUNNING', 'RESTARTING', 'RESIZING', 'TERMINATING', 'TERMINATED', 'ERROR', 'UNKNOWN'][source]
state = ''[source]
state_message = ''[source]
property is_terminal: bool[source]

True if the current state is a terminal state.

property is_running: bool[source]

True if the current state is running.

__eq__(other)[source]
__repr__()[source]
to_json()[source]
classmethod from_json(data)[source]
class airflow.providers.databricks.hooks.databricks.SQLStatementState(state='', error_code='', error_message='', *args, **kwargs)[source]

Utility class for the SQL statement state concept of Databricks statements.

SQL_STATEMENT_LIFE_CYCLE_STATES = ['PENDING', 'RUNNING', 'SUCCEEDED', 'FAILED', 'CANCELED', 'CLOSED'][source]
state = ''[source]
error_code = ''[source]
error_message = ''[source]
property is_terminal: bool[source]

True if the current state is a terminal state.

property is_running: bool[source]

True if the current state is running.

property is_successful: bool[source]

True if the state is SUCCEEDED.

__eq__(other)[source]
__repr__()[source]
to_json()[source]
classmethod from_json(data)[source]
class airflow.providers.databricks.hooks.databricks.DatabricksHook(databricks_conn_id=BaseDatabricksHook.default_conn_name, timeout_seconds=180, retry_limit=3, retry_delay=1.0, retry_args=None, caller='DatabricksHook')[source]

Bases: airflow.providers.databricks.hooks.databricks_base.BaseDatabricksHook

Interact with Databricks.

Parameters:
  • databricks_conn_id (str) – Reference to the Databricks connection.

  • timeout_seconds (int) – The amount of time in seconds the requests library will wait before timing-out.

  • retry_limit (int) – The number of times to retry the connection in case of service outages.

  • retry_delay (float) – The number of seconds to wait between retries (it might be a floating point number).

  • retry_args (dict[Any, Any] | None) – An optional dictionary with arguments passed to tenacity.Retrying class.

hook_name = 'Databricks'[source]
create_job(json)[source]

Call the api/2.1/jobs/create endpoint.

Parameters:

json (dict) – The data used in the body of the request to the create endpoint.

Returns:

the job_id as an int

Return type:

int

reset_job(job_id, json)[source]

Call the api/2.1/jobs/reset endpoint.

Parameters:

json (dict) – The data used in the new_settings of the request to the reset endpoint.

update_job(job_id, json)[source]

Call the api/2.1/jobs/update endpoint.

Parameters:
  • job_id (str) – The id of the job to update.

  • json (dict) – The data used in the new_settings of the request to the update endpoint.

run_now(json)[source]

Call the api/2.1/jobs/run-now endpoint.

Parameters:

json (dict) – The data used in the body of the request to the run-now endpoint.

Returns:

the run_id as an int

Return type:

int

submit_run(json)[source]

Call the api/2.1/jobs/runs/submit endpoint.

Parameters:

json (dict) – The data used in the body of the request to the submit endpoint.

Returns:

the run_id as an int

Return type:

int

list_jobs(limit=25, expand_tasks=False, job_name=None, page_token=None, include_user_names=False)[source]

List the jobs in the Databricks Job Service.

Parameters:
  • limit (int) – The limit/batch size used to retrieve jobs.

  • expand_tasks (bool) – Whether to include task and cluster details in the response.

  • job_name (str | None) – Optional name of a job to search.

  • page_token (str | None) – The optional page token pointing at the first first job to return.

Returns:

A list of jobs.

Return type:

list[dict[str, Any]]

find_job_id_by_name(job_name)[source]

Find job id by its name; if there are multiple jobs with the same name, raise AirflowException.

Parameters:

job_name (str) – The name of the job to look up.

Returns:

The job_id as an int or None if no job was found.

Return type:

int | None

list_pipelines(batch_size=25, pipeline_name=None, notebook_path=None)[source]

List the pipelines in Databricks Delta Live Tables.

Parameters:
  • batch_size (int) – The limit/batch size used to retrieve pipelines.

  • pipeline_name (str | None) – Optional name of a pipeline to search. Cannot be combined with path.

  • notebook_path (str | None) – Optional notebook of a pipeline to search. Cannot be combined with name.

Returns:

A list of pipelines.

Return type:

list[dict[str, Any]]

find_pipeline_id_by_name(pipeline_name)[source]

Find pipeline id by its name; if multiple pipelines with the same name, raise AirflowException.

Parameters:

pipeline_name (str) – The name of the pipeline to look up.

Returns:

The pipeline_id as a GUID string or None if no pipeline was found.

Return type:

str | None

get_run_page_url(run_id)[source]

Retrieve run_page_url.

Parameters:

run_id (int) – id of the run

Returns:

URL of the run page

Return type:

str

async a_get_run_page_url(run_id)[source]

Async version of get_run_page_url().

Parameters:

run_id (int) – id of the run

Returns:

URL of the run page

Return type:

str

get_job_id(run_id)[source]

Retrieve job_id from run_id.

Parameters:

run_id (int) – id of the run

Returns:

Job id for given Databricks run

Return type:

int

get_run_state(run_id)[source]

Retrieve run state of the run.

Please note that any Airflow tasks that call the get_run_state method will result in failure unless you have enabled xcom pickling. This can be done using the following environment variable: AIRFLOW__CORE__ENABLE_XCOM_PICKLING

If you do not want to enable xcom pickling, use the get_run_state_str method to get a string describing state, or get_run_state_lifecycle, get_run_state_result, or get_run_state_message to get individual components of the run state.

Parameters:

run_id (int) – id of the run

Returns:

state of the run

Return type:

RunState

async a_get_run_state(run_id)[source]

Async version of get_run_state().

Parameters:

run_id (int) – id of the run

Returns:

state of the run

Return type:

RunState

get_run(run_id)[source]

Retrieve run information.

Parameters:

run_id (int) – id of the run

Returns:

state of the run

Return type:

dict[str, Any]

async a_get_run(run_id)[source]

Async version of get_run.

Parameters:

run_id (int) – id of the run

Returns:

state of the run

Return type:

dict[str, Any]

get_run_state_str(run_id)[source]

Return the string representation of RunState.

Parameters:

run_id (int) – id of the run

Returns:

string describing run state

Return type:

str

get_run_state_lifecycle(run_id)[source]

Return the lifecycle state of the run.

Parameters:

run_id (int) – id of the run

Returns:

string with lifecycle state

Return type:

str

get_run_state_result(run_id)[source]

Return the resulting state of the run.

Parameters:

run_id (int) – id of the run

Returns:

string with resulting state

Return type:

str

get_run_state_message(run_id)[source]

Return the state message for the run.

Parameters:

run_id (int) – id of the run

Returns:

string with state message

Return type:

str

get_run_output(run_id)[source]

Retrieve run output of the run.

Parameters:

run_id (int) – id of the run

Returns:

output of the run

Return type:

dict

async a_get_run_output(run_id)[source]

Async version of get_run_output().

Parameters:

run_id (int) – id of the run

Returns:

output of the run

Return type:

dict

cancel_run(run_id)[source]

Cancel the run.

Parameters:

run_id (int) – id of the run

cancel_all_runs(job_id)[source]

Cancel all active runs of a job asynchronously.

Parameters:

job_id (int) – The canonical identifier of the job to cancel all runs of

delete_run(run_id)[source]

Delete a non-active run.

Parameters:

run_id (int) – id of the run

repair_run(json)[source]

Re-run one or more tasks.

Parameters:

json (dict) – repair a job run.

get_latest_repair_id(run_id)[source]

Get latest repair id if any exist for run_id else None.

get_cluster_state(cluster_id)[source]

Retrieve run state of the cluster.

Parameters:

cluster_id (str) – id of the cluster

Returns:

state of the cluster

Return type:

ClusterState

async a_get_cluster_state(cluster_id)[source]

Async version of get_cluster_state.

Parameters:

cluster_id (str) – id of the cluster

Returns:

state of the cluster

Return type:

ClusterState

restart_cluster(json)[source]

Restarts the cluster.

Parameters:

json (dict) – json dictionary containing cluster specification.

start_cluster(json)[source]

Start the cluster.

Parameters:

json (dict) – json dictionary containing cluster specification.

terminate_cluster(json)[source]

Terminate the cluster.

Parameters:

json (dict) – json dictionary containing cluster specification.

install(json)[source]

Install libraries on the cluster.

Utility function to call the 2.0/libraries/install endpoint.

Parameters:

json (dict) – json dictionary containing cluster_id and an array of library

uninstall(json)[source]

Uninstall libraries on the cluster.

Utility function to call the 2.0/libraries/uninstall endpoint.

Parameters:

json (dict) – json dictionary containing cluster_id and an array of library

update_repo(repo_id, json)[source]

Update given Databricks Repos.

Parameters:
  • repo_id (str) – ID of Databricks Repos

  • json (dict[str, Any]) – payload

Returns:

metadata from update

Return type:

dict

delete_repo(repo_id)[source]

Delete given Databricks Repos.

Parameters:

repo_id (str) – ID of Databricks Repos

Returns:

create_repo(json)[source]

Create a Databricks Repos.

Parameters:

json (dict[str, Any]) – payload

Returns:

Return type:

dict

get_repo_by_path(path)[source]

Obtain Repos ID by path.

Parameters:

path (str) – path to a repository

Returns:

Repos ID if it exists, None if doesn’t.

Return type:

str | None

update_job_permission(job_id, json)[source]

Update databricks job permission.

Parameters:
  • job_id (int) – job id

  • json (dict[str, Any]) – payload

Returns:

json containing permission specification

Return type:

dict

post_sql_statement(json)[source]

Submit a SQL statement to the Databricks SQL Statements endpoint.

Parameters:

json (dict[str, Any]) – The data used in the body of the request to the SQL Statements endpoint.

Returns:

The statement_id as a string.

Return type:

str

get_sql_statement_state(statement_id)[source]

Retrieve run state of the SQL statement.

Parameters:

statement_id (str) – ID of the SQL statement.

Returns:

state of the SQL statement.

Return type:

SQLStatementState

async a_get_sql_statement_state(statement_id)[source]

Async version of get_sql_statement_state.

Parameters:

statement_id (str) – ID of the SQL statement

Returns:

state of the SQL statement

Return type:

SQLStatementState

cancel_sql_statement(statement_id)[source]

Cancel the SQL statement.

Parameters:

statement_id (str) – ID of the SQL statement

test_connection()[source]

Test the Databricks connectivity from UI.

Was this entry helpful?