DatabricksCreateJobsOperator¶
Use the DatabricksCreateJobsOperator to create
(or reset) a Databricks job. This operator relies on past XComs to remember the job_id that
was created so that repeated calls with this operator will update the existing job rather than
creating new ones. When paired with the DatabricksRunNowOperator all runs will fall under the same
job within the Databricks UI.
Using the Operator¶
There are three ways to instantiate this operator. In the first way, you can take the JSON payload that you typically use
to call the api/2.2/jobs/create endpoint and pass it directly to our DatabricksCreateJobsOperator through the
json parameter. With this approach you get full control over the underlying payload to Jobs REST API, including
execution of Databricks jobs with multiple tasks, but it’s harder to detect errors because of the lack of the type checking.
The second way to accomplish the same thing is to use the named parameters of the DatabricksCreateJobsOperator directly. Note that there is exactly
one named parameter for each top level parameter in the api/2.2/jobs/create endpoint.
The third way is to use both the json parameter AND the named parameters. They will be merged
together. If there are conflicts during the merge, the named parameters will take precedence and
override the top level json keys.
- Currently the named parameters that
DatabricksCreateJobsOperatorsupports are: namedescriptiontagstasksjob_clustersemail_notificationswebhook_notificationsnotification_settingstimeout_secondsschedulemax_concurrent_runsgit_sourceaccess_control_list
Forwarding Airflow Dag params as Databricks job parameters¶
The Databricks api/2.2/jobs/create endpoint accepts a top-level parameters field
that defines job-level parameters — a list of objects
with a name (the parameter name) and a default (its default value), for example
[{"name": "env", "default": "prod"}].
If parameters is not set in json and the operator’s params dict is non-empty,
each key/value pair in params is converted into one such {"name": key, "default":
value} entry, so that Airflow Dag params can be forwarded as Databricks job parameters
without hardcoding the API shape in json. If json already contains parameters,
it is left untouched.
# Airflow Dag params (key/value pairs)
params = {"env": "prod", "batch_size": "100"}
create_job = DatabricksCreateJobsOperator(
task_id="create_job",
json={"name": "my-job", "tasks": [...]},
params=params,
)
# The Databricks job created/reset by the operator will have:
# parameters=[
# {"name": "env", "default": "prod"},
# {"name": "batch_size", "default": "100"},
# ]
# i.e. each "<key>: <value>" in params becomes one
# {"name": "<key>", "default": "<value>"} entry in the job definition.
Examples¶
Specifying parameters as JSON¶
An example usage of the DatabricksCreateJobsOperator is as follows:
# Example of using the JSON parameter to initialize the operator.
job = {
"tasks": [
{
"task_key": "test",
"job_cluster_key": "job_cluster",
"notebook_task": {
"notebook_path": "/Shared/test",
},
},
],
"job_clusters": [
{
"job_cluster_key": "job_cluster",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
},
],
}
jobs_create_json = DatabricksCreateJobsOperator(task_id="jobs_create_json", json=job)
Using named parameters¶
You can also use named parameters to initialize the operator and run the job.
# Example of using the named parameters to initialize the operator.
tasks = [
{
"task_key": "test",
"job_cluster_key": "job_cluster",
"notebook_task": {
"notebook_path": "/Shared/test",
},
},
]
job_clusters = [
{
"job_cluster_key": "job_cluster",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
},
]
jobs_create_named = DatabricksCreateJobsOperator(
task_id="jobs_create_named", tasks=tasks, job_clusters=job_clusters
)
Pairing with DatabricksRunNowOperator¶
You can use the job_id that is returned by the DatabricksCreateJobsOperator in the
return_value XCom as an argument to the DatabricksRunNowOperator to run the job.
# Example of using the DatabricksRunNowOperator after creating a job with DatabricksCreateJobsOperator.
run_now = DatabricksRunNowOperator(task_id="run_now", job_id=jobs_create_named.output)
jobs_create_named >> run_now