Google Cloud BigQuery Routines Operators

BigQuery routines are dataset-scoped resources that encapsulate logic you can reuse from SQL:

  • Scalar user-defined functions (SQL or JavaScript)

  • Stored procedures (SQL or Apache Spark)

  • Table-valued functions (SQL)

  • User-defined aggregate functions (SQL)

  • Remote functions backed by Cloud Run / Cloud Functions

Airflow exposes the BigQuery routines API so your DAG can own both the routine definitions and the pipeline that depends on them, instead of embedding CREATE FUNCTION / CREATE PROCEDURE DDL in a query job.

Prerequisite Tasks

To use these operators, you must do a few things:

Create a routine

Use BigQueryCreateRoutineOperator to create any routine type. Routine fields mirror the BigQuery REST API’s Routine resource. Pass them individually as keyword arguments, or pass the complete resource via routine_resource.

The if_exists argument controls collision behavior:

  • "fail" (default) — raise when the routine already exists.

  • "skip" — leave the existing routine in place and return it.

  • "replace" — delete the existing routine, then create the new one.

Scalar SQL UDF:

tests/system/google/cloud/bigquery/example_bigquery_routines.py[source]

create_scalar_routine = BigQueryCreateRoutineOperator(
    task_id="create_scalar_routine",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    routine_id=SCALAR_ROUTINE,
    routine_type="SCALAR_FUNCTION",
    language="SQL",
    arguments=[{"name": "x", "dataType": INT64_TYPE}],
    return_type=INT64_TYPE,
    definition_body="x + 1",
    description="Adds one to its argument.",
)

Stored procedure:

tests/system/google/cloud/bigquery/example_bigquery_routines.py[source]

create_procedure = BigQueryCreateRoutineOperator(
    task_id="create_procedure",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    routine_id=PROCEDURE_ROUTINE,
    routine_type="PROCEDURE",
    language="SQL",
    arguments=[
        {"name": "prefix", "dataType": STRING_TYPE, "argumentKind": "FIXED_TYPE"},
    ],
    definition_body="BEGIN SELECT CONCAT(prefix, ' world') AS greeting; END",
    description="Echoes a prefixed greeting.",
)

Table-valued function:

tests/system/google/cloud/bigquery/example_bigquery_routines.py[source]

create_tvf = BigQueryCreateRoutineOperator(
    task_id="create_tvf",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    routine_id=TVF_ROUTINE,
    routine_type="TABLE_VALUED_FUNCTION",
    language="SQL",
    arguments=[{"name": "n", "dataType": INT64_TYPE}],
    return_table_type={
        "columns": [
            {"name": "value", "type": INT64_TYPE},
        ]
    },
    definition_body="SELECT * FROM UNNEST(GENERATE_ARRAY(1, n)) AS value",
    description="Generates integers 1..n as a table.",
)

Update a routine

Use BigQueryUpdateRoutineOperator to patch selected fields of an existing routine. Only the fields listed in fields are updated; any listed field that is unset in routine_resource is cleared on the server.

tests/system/google/cloud/bigquery/example_bigquery_routines.py[source]

update_routine = BigQueryUpdateRoutineOperator(
    task_id="update_routine",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    routine_id=SCALAR_ROUTINE,
    routine_resource={"description": "Updated description for scalar UDF"},
    fields=["description"],
)

Fetch a routine

Use BigQueryGetRoutineOperator to read a routine’s metadata. The operator pushes the serialized resource to XCom.

tests/system/google/cloud/bigquery/example_bigquery_routines.py[source]

get_routine = BigQueryGetRoutineOperator(
    task_id="get_routine",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    routine_id=SCALAR_ROUTINE,
)

List routines

Use BigQueryListRoutinesOperator to list all routines in a dataset. Only a subset of each routine’s fields is returned; use BigQueryGetRoutineOperator for the complete resource.

tests/system/google/cloud/bigquery/example_bigquery_routines.py[source]

list_routines = BigQueryListRoutinesOperator(
    task_id="list_routines",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
)

Delete a routine

Use BigQueryDeleteRoutineOperator to remove a routine. Set ignore_if_missing=True to make the delete a no-op when the routine does not exist.

tests/system/google/cloud/bigquery/example_bigquery_routines.py[source]

delete_scalar_routine = BigQueryDeleteRoutineOperator(
    task_id="delete_scalar_routine",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    routine_id=SCALAR_ROUTINE,
    ignore_if_missing=True,
)

Wait for a routine

Use BigQueryRoutineExistenceSensor to block downstream tasks until a routine exists. This is useful when routine creation happens in a separate DAG or an external system.

tests/system/google/cloud/bigquery/example_bigquery_routines.py[source]

wait_for_routine = BigQueryRoutineExistenceSensor(
    task_id="wait_for_routine",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    routine_id=SCALAR_ROUTINE,
    timeout=60,
    poke_interval=5,
)

Reference

For further information, look at:

Was this entry helpful?