Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

Google Cloud Logging Sink Operators

Cloud Logging allows you to export log entries outside of Cloud Logging using sinks.

For more information, visit Cloud Logging documentation.

Prerequisite Tasks

To use these operators, you must do a few things:

Create a log sink (Dict-based)

You can create a Cloud Logging sink using a Python dictionary:

CloudLoggingCreateSinkOperator

tests/system/google/cloud/cloud_logging_sink/example_cloud_logging_sink.py[source]

create_sink = CloudLoggingCreateSinkOperator(
    task_id="create_sink",
    project_id=PROJECT_ID,
    sink_config={
        "name": SINK_NAME,
        "destination": "storage.googleapis.com/test-log-sink-af",
        "description": "Create with full sink_config",
        "filter": "severity>=INFO",
        "disabled": False,
        "exclusions": [
            {
                "name": "exclude-debug",
                "description": "Skip debug logs",
                "filter": "severity=DEBUG",
                "disabled": True,
            },
            {
                "name": "exclude-cloudsql",
                "description": "Skip CloudSQL logs",
                "filter": 'resource.type="cloudsql_database"',
                "disabled": False,
            },
        ],
    },
    gcp_conn_id=CONN_ID,
)

*Required fields in ``sink_config`` (dict):*

  • name: The name of the sink.

  • destination: The export destination (e.g., storage.googleapis.com/..., bigquery.googleapis.com/...).

Other fields such as description, filter, disabled, and exclusions are optional.

Update a log Sink (Protobuf)

You can also provide the sink_config as a google.cloud.logging_v2.types.LogSink Protobuf object, and the update_mask as a google.protobuf.field_mask_pb2.FieldMask.

The following import is required when using a Protobuf object:

tests/system/google/cloud/cloud_logging_sink/example_cloud_logging_sink.py[source]

from google.cloud.logging_v2.types import LogSink
from google.protobuf.field_mask_pb2 import FieldMask

CloudLoggingUpdateSinkOperator

tests/system/google/cloud/cloud_logging_sink/example_cloud_logging_sink.py[source]

update_sink_config = CloudLoggingUpdateSinkOperator(
    task_id="update_sink_config",
    sink_name=SINK_NAME,
    project_id=PROJECT_ID,
    sink_config=LogSink(
        {
            "description": "Update #1: GCE logs only",
            "filter": 'resource.type="gce_instance"',
            "disabled": False,
        }
    ),
    update_mask=FieldMask(paths=["description", "filter", "disabled"]),
    unique_writer_identity=True,
    gcp_conn_id=CONN_ID,
)

When updating a sink, only include the fields you want to change in sink_config, and list those fields in the update_mask["paths"].

List log sinks

To list all sinks in a Google Cloud project:

CloudLoggingListSinksOperator

tests/system/google/cloud/cloud_logging_sink/example_cloud_logging_sink.py[source]

list_sinks_after = CloudLoggingListSinksOperator(
    task_id="list_sinks_after_update",
    project_id=PROJECT_ID,
    gcp_conn_id=CONN_ID,
)

This operator returns a list of sink dictionaries from the project.

Delete a log sink

To delete a sink from a Google Cloud project:

CloudLoggingDeleteSinkOperator

tests/system/google/cloud/cloud_logging_sink/example_cloud_logging_sink.py[source]

delete_sink = CloudLoggingDeleteSinkOperator(
    task_id="delete_sink",
    sink_name=SINK_NAME,
    project_id=PROJECT_ID,
    gcp_conn_id=CONN_ID,
)

Was this entry helpful?