airflow.providers.elasticsearch.log.es_task_handler

Attributes

LOG_LINE_DEFAULTS

EsLogMsgType

USE_PER_RUN_LOG_ID

VALID_ES_CONFIG_KEYS

Classes

ElasticsearchTaskHandler

ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch.

Functions

get_es_kwargs_from_config()

Module Contents

airflow.providers.elasticsearch.log.es_task_handler.LOG_LINE_DEFAULTS[source]
airflow.providers.elasticsearch.log.es_task_handler.EsLogMsgType[source]
airflow.providers.elasticsearch.log.es_task_handler.USE_PER_RUN_LOG_ID = True[source]
airflow.providers.elasticsearch.log.es_task_handler.VALID_ES_CONFIG_KEYS[source]
airflow.providers.elasticsearch.log.es_task_handler.get_es_kwargs_from_config()[source]
class airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, write_to_es=False, target_index='airflow-logs', host_field='host', offset_field='offset', host='http://localhost:9200', frontend='localhost:5601', index_patterns=conf.get('elasticsearch', 'index_patterns'), index_patterns_callable=conf.get('elasticsearch', 'index_patterns_callable', fallback=''), es_kwargs='default_es_kwargs', **kwargs)[source]

Bases: airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.ExternalLoggingMixin, airflow.utils.log.logging_mixin.LoggingMixin

ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch.

Note that Airflow by default does not handle the indexing of logs into Elasticsearch. Instead, Airflow flushes logs into local files. Additional software setup is required to index the logs into Elasticsearch, such as using Filebeat and Logstash.

Airflow can be configured to support directly writing logging to Elasticsearch. To enable this feature, set json_format and write_to_es to True.

To efficiently query and sort Elasticsearch results, this handler assumes each log message has a field log_id consists of ti primary keys: log_id = {dag_id}-{task_id}-{logical_date}-{try_number} Log messages with specific log_id are sorted based on offset, which is a unique integer indicates log message’s order. Timestamps here are unreliable because multiple log messages might have the same timestamp.

Parameters:
  • base_log_folder (str) – base folder to store logs locally

  • log_id_template – log id template

  • host (str) – Elasticsearch host name

PAGE = 0[source]
MAX_LINE_PER_PAGE = 1000[source]
LOG_NAME = 'Elasticsearch'[source]
trigger_should_wrap = True[source]
closed = False[source]
client[source]
frontend = 'localhost:5601'[source]
mark_end_on_close = True[source]
end_of_log_mark[source]
write_stdout[source]
json_format[source]
json_fields[source]
host_field = 'host'[source]
offset_field = 'offset'[source]
index_patterns[source]
index_patterns_callable[source]
context_set = False[source]
write_to_es = False[source]
target_index = 'airflow-logs'[source]
delete_local_copy[source]
formatter: logging.Formatter[source]
handler: logging.FileHandler | logging.StreamHandler[source]
static format_url(host)[source]

Format the given host string to ensure it starts with ‘http’ and check if it represents a valid URL.

Params host:

The host string to format and check.

emit(record)[source]

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

set_context(ti, *, identifier=None)[source]

Provide task_instance context to airflow task handler.

Parameters:
  • ti (airflow.models.taskinstance.TaskInstance) – task instance object

  • identifier (str | None) – if set, identifies the Airflow component which is relaying logs from exceptional scenarios related to the task instance

close()[source]

Tidy up any resources used by the handler.

This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.

property log_name: str[source]

The log name.

get_external_log_url(task_instance, try_number)[source]

Create an address for an external log collecting service.

Parameters:
Returns:

URL to the external log collection service

Return type:

str

Whether we can support external links.

Was this entry helpful?