Writing logs to Elasticsearch

Airflow can be configured to read task logs from Elasticsearch and optionally write logs to stdout in standard or json format. These logs can later be collected and forwarded to the Elasticsearch cluster using tools like fluentd, logstash or others.

Airflow also supports writing log to Elasticsearch directly without requiring additional software like filebeat and logstash. To enable this feature, set write_to_es and json_format to True and write_stdout to False in airflow.cfg. Please be aware that if you set both write_to_es and delete_local_logs in logging section to true, airflow will delete the local copy of task logs upon successfully writing task logs to ElasticSearch.

You can choose to have all task logs from workers output to the highest parent level process, instead of the standard file locations. This allows for some additional flexibility in container environments like Kubernetes, where container stdout is already being logged to the host nodes. From there a log shipping tool can be used to forward them along to Elasticsearch. To use this feature, set the write_stdout option in airflow.cfg. You can also choose to have the logs output in a JSON format, using the json_format option. Airflow uses the standard Python logging module and JSON fields are directly extracted from the LogRecord object. To use this feature, set the json_fields option in airflow.cfg. Add the fields to the comma-delimited string that you want collected for the logs. These fields are from the LogRecord object in the logging module. Documentation on different attributes can be found here.

First, to use the handler, airflow.cfg must be configured as follows:

[logging]
remote_logging = True

[elasticsearch]
host = <host>:<port>

To output task logs to stdout in JSON format, the following config could be used:

[logging]
remote_logging = True

[elasticsearch]
host = <host>:<port>
write_stdout = True
json_format = True

To output task logs to ElasticSearch, the following config could be used: (set delete_local_logs to true if you don’t want retain a local copy of task log)

[logging]
remote_logging = True
delete_local_logs = False

[elasticsearch]
host = <host>:<port>
write_stdout = False
json_format = True
write_to_es = True
target_index = [name of the index to store logs]

Writing logs to Elasticsearch over TLS

To add custom configurations to ElasticSearch (e.g. turning on ssl_verify, adding a custom self-signed cert, etc.) use the elasticsearch_configs setting in your airflow.cfg

Note that these configurations also apply when you enable writing logs to ElasticSearch

[logging]
remote_logging = True

[elasticsearch_configs]
verify_certs=True
ca_certs=/path/to/CA_certs

Additionally, in the elasticsearch_configs section, you can pass any parameters supported by the Elasticsearch Python client. These parameters will be passed directly into the elasticsearch.Elasticsearch(**kwargs) client. For example:

[elasticsearch_configs]
http_compress = True
ca_certs = /root/ca.pem
api_key = "SOMEAPIKEY"
verify_certs = True

Changes to [elasticsearch] log_id_template

If you ever need to make changes to [elasticsearch] log_id_template, Airflow 2.3.0+ is able to keep track of old values so your existing task runs logs can still be fetched. Once you are on Airflow 2.3.0+, in general, you can just change log_id_template at will and Airflow will keep track of the changes.

However, when you are upgrading to 2.3.0+, Airflow may not be able to properly save your previous log_id_template. If after upgrading you find your task logs are no longer accessible, try adding a row in the log_template table with id=0 containing your previous log_id_template. For example, if you used the defaults in 2.2.5:

INSERT INTO log_template (id, filename, elasticsearch_id, created_at) VALUES (0, '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log', '{dag_id}-{task_id}-{execution_date}-{try_number}', NOW());

Was this entry helpful?