Google Cloud Logging Sink Operators¶
Cloud Logging allows you to export log entries outside of Cloud Logging using sinks.
For more information, visit Cloud Logging documentation.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Create a log sink (Dict-based)¶
You can create a Cloud Logging sink using a Python dictionary:
CloudLoggingCreateSinkOperator
create_sink = CloudLoggingCreateSinkOperator(
task_id="create_sink",
project_id=PROJECT_ID,
sink_config={
"name": SINK_NAME,
"destination": "storage.googleapis.com/test-log-sink-af",
"description": "Create with full sink_config",
"filter": "severity>=INFO",
"disabled": False,
"exclusions": [
{
"name": "exclude-debug",
"description": "Skip debug logs",
"filter": "severity=DEBUG",
"disabled": True,
},
{
"name": "exclude-cloudsql",
"description": "Skip CloudSQL logs",
"filter": 'resource.type="cloudsql_database"',
"disabled": False,
},
],
},
gcp_conn_id=CONN_ID,
)
*Required fields in ``sink_config`` (dict):*
name
: The name of the sink.destination
: The export destination (e.g.,storage.googleapis.com/...
,bigquery.googleapis.com/...
).
Other fields such as description
, filter
, disabled
, and exclusions
are optional.
Update a log Sink (Protobuf)¶
You can also provide the sink_config
as a google.cloud.logging_v2.types.LogSink
Protobuf object,
and the update_mask
as a google.protobuf.field_mask_pb2.FieldMask
.
The following import is required when using a Protobuf object:
from google.cloud.logging_v2.types import LogSink
from google.protobuf.field_mask_pb2 import FieldMask
CloudLoggingUpdateSinkOperator
update_sink_config = CloudLoggingUpdateSinkOperator(
task_id="update_sink_config",
sink_name=SINK_NAME,
project_id=PROJECT_ID,
sink_config=LogSink(
{
"description": "Update #1: GCE logs only",
"filter": 'resource.type="gce_instance"',
"disabled": False,
}
),
update_mask=FieldMask(paths=["description", "filter", "disabled"]),
unique_writer_identity=True,
gcp_conn_id=CONN_ID,
)
When updating a sink, only include the fields you want to change in sink_config
, and list those fields in the update_mask["paths"]
.
List log sinks¶
To list all sinks in a Google Cloud project:
list_sinks_after = CloudLoggingListSinksOperator(
task_id="list_sinks_after_update",
project_id=PROJECT_ID,
gcp_conn_id=CONN_ID,
)
This operator returns a list of sink dictionaries from the project.
Delete a log sink¶
To delete a sink from a Google Cloud project:
CloudLoggingDeleteSinkOperator
delete_sink = CloudLoggingDeleteSinkOperator(
task_id="delete_sink",
sink_name=SINK_NAME,
project_id=PROJECT_ID,
gcp_conn_id=CONN_ID,
)