Google Cloud BigQuery Routines Operators¶
BigQuery routines are dataset-scoped resources that encapsulate logic you can reuse from SQL:
Scalar user-defined functions (SQL or JavaScript)
Stored procedures (SQL or Apache Spark)
Table-valued functions (SQL)
User-defined aggregate functions (SQL)
Remote functions backed by Cloud Run / Cloud Functions
Airflow exposes the BigQuery routines API so your DAG can own both the routine
definitions and the pipeline that depends on them, instead of embedding
CREATE FUNCTION / CREATE PROCEDURE DDL in a query job.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Create a routine¶
Use BigQueryCreateRoutineOperator
to create any routine type. Routine fields mirror the BigQuery REST API’s
Routine resource. Pass them individually as keyword arguments, or pass the
complete resource via routine_resource.
The if_exists argument controls collision behavior:
"fail"(default) — raise when the routine already exists."skip"— leave the existing routine in place and return it."replace"— delete the existing routine, then create the new one.
Scalar SQL UDF:
create_scalar_routine = BigQueryCreateRoutineOperator(
task_id="create_scalar_routine",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
routine_id=SCALAR_ROUTINE,
routine_type="SCALAR_FUNCTION",
language="SQL",
arguments=[{"name": "x", "dataType": INT64_TYPE}],
return_type=INT64_TYPE,
definition_body="x + 1",
description="Adds one to its argument.",
)
Stored procedure:
create_procedure = BigQueryCreateRoutineOperator(
task_id="create_procedure",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
routine_id=PROCEDURE_ROUTINE,
routine_type="PROCEDURE",
language="SQL",
arguments=[
{"name": "prefix", "dataType": STRING_TYPE, "argumentKind": "FIXED_TYPE"},
],
definition_body="BEGIN SELECT CONCAT(prefix, ' world') AS greeting; END",
description="Echoes a prefixed greeting.",
)
Table-valued function:
create_tvf = BigQueryCreateRoutineOperator(
task_id="create_tvf",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
routine_id=TVF_ROUTINE,
routine_type="TABLE_VALUED_FUNCTION",
language="SQL",
arguments=[{"name": "n", "dataType": INT64_TYPE}],
return_table_type={
"columns": [
{"name": "value", "type": INT64_TYPE},
]
},
definition_body="SELECT * FROM UNNEST(GENERATE_ARRAY(1, n)) AS value",
description="Generates integers 1..n as a table.",
)
Update a routine¶
Use BigQueryUpdateRoutineOperator
to patch selected fields of an existing routine. Only the fields listed in
fields are updated; any listed field that is unset in routine_resource
is cleared on the server.
update_routine = BigQueryUpdateRoutineOperator(
task_id="update_routine",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
routine_id=SCALAR_ROUTINE,
routine_resource={"description": "Updated description for scalar UDF"},
fields=["description"],
)
Fetch a routine¶
Use BigQueryGetRoutineOperator
to read a routine’s metadata. The operator pushes the serialized resource to
XCom.
get_routine = BigQueryGetRoutineOperator(
task_id="get_routine",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
routine_id=SCALAR_ROUTINE,
)
List routines¶
Use BigQueryListRoutinesOperator
to list all routines in a dataset. Only a subset of each routine’s fields is
returned; use BigQueryGetRoutineOperator for the complete resource.
list_routines = BigQueryListRoutinesOperator(
task_id="list_routines",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
)
Delete a routine¶
Use BigQueryDeleteRoutineOperator
to remove a routine. Set ignore_if_missing=True to make the delete a no-op
when the routine does not exist.
delete_scalar_routine = BigQueryDeleteRoutineOperator(
task_id="delete_scalar_routine",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
routine_id=SCALAR_ROUTINE,
ignore_if_missing=True,
)
Wait for a routine¶
Use BigQueryRoutineExistenceSensor
to block downstream tasks until a routine exists. This is useful when routine
creation happens in a separate DAG or an external system.
wait_for_routine = BigQueryRoutineExistenceSensor(
task_id="wait_for_routine",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
routine_id=SCALAR_ROUTINE,
timeout=60,
poke_interval=5,
)
Reference¶
For further information, look at: