airflow.providers.opensearch.log.os_task_handler

Module Contents

Classes

OpensearchTaskHandler

OpensearchTaskHandler is a Python log handler that reads and writes logs to OpenSearch.

Attributes

USE_PER_RUN_LOG_ID

OsLogMsgType

LOG_LINE_DEFAULTS

airflow.providers.opensearch.log.os_task_handler.USE_PER_RUN_LOG_ID[source]
airflow.providers.opensearch.log.os_task_handler.OsLogMsgType[source]
airflow.providers.opensearch.log.os_task_handler.LOG_LINE_DEFAULTS[source]
airflow.providers.opensearch.log.os_task_handler.get_os_kwargs_from_config()[source]
class airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, host, port, username, password, host_field='host', offset_field='offset', index_patterns=conf.get('opensearch', 'index_patterns', fallback='_all'), index_patterns_callable=conf.get('opensearch', 'index_patterns_callable', fallback=''), os_kwargs='default_os_kwargs')[source]

Bases: airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.ExternalLoggingMixin, airflow.utils.log.logging_mixin.LoggingMixin

OpensearchTaskHandler is a Python log handler that reads and writes logs to OpenSearch.

Like the ElasticsearchTaskHandler, Airflow itself does not handle the indexing of logs. Instead, logs are flushed to local files, and additional software (e.g., Filebeat, Logstash) may be required to ship logs to OpenSearch. This handler then enables fetching and displaying logs from OpenSearch.

To efficiently query and sort Elasticsearch results, this handler assumes each log message has a field log_id consists of ti primary keys: log_id = {dag_id}-{task_id}-{logical_date}-{try_number} Log messages with specific log_id are sorted based on offset, which is a unique integer indicates log message’s order. Timestamps here are unreliable because multiple log messages might have the same timestamp.

Parameters
  • base_log_folder (str) – Base folder to store logs locally.

  • end_of_log_mark (str) – A marker string to signify the end of logs.

  • write_stdout (bool) – Whether to also write logs to stdout.

  • json_format (bool) – Whether to format logs as JSON.

  • json_fields (str) – Comma-separated list of fields to include in the JSON log output.

  • host (str) – OpenSearch host name.

  • port (int) – OpenSearch port.

  • username (str) – Username for OpenSearch authentication.

  • password (str) – Password for OpenSearch authentication.

  • host_field (str) – The field name for the host in the logs (default is “host”).

  • offset_field (str) – The field name for the log offset (default is “offset”).

  • index_patterns (str) – Index pattern or template for storing logs.

  • index_patterns_callable (str) – Callable that dynamically generates index patterns based on context.

  • os_kwargs (dict | None | Literal[default_os_kwargs]) – Additional OpenSearch client options. This can be set to “default_os_kwargs” to load the default configuration from Airflow’s settings.

PAGE = 0[source]
MAX_LINE_PER_PAGE = 1000[source]
LOG_NAME = 'Opensearch'[source]
trigger_should_wrap = True[source]
set_context(ti, *, identifier=None)[source]

Provide task_instance context to airflow task handler.

Parameters
  • ti (airflow.models.taskinstance.TaskInstance) – task instance object

  • identifier (str | None) – if set, identifies the Airflow component which is relaying logs from exceptional scenarios related to the task instance

emit(record)[source]

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

close()[source]

Tidy up any resources used by the handler.

This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.

Was this entry helpful?