Configuration Reference¶
This page contains the list of all available Airflow configurations for the
apache-airflow-providers-openlineage provider that can be set in the airflow.cfg file or using environment variables.
Note
For more information see Setting Configuration Options.
[openlineage]¶
This section applies settings for OpenLineage integration.
config_path¶
Specify the path to the YAML configuration file. This ensures backwards compatibility with passing config through the openlineage.yml file.
- Type:
string
- Default:
''- Environment Variable:
AIRFLOW__OPENLINEAGE__CONFIG_PATH- Example:
full/path/to/openlineage.yml
custom_run_facets¶
Added in version 1.10.0.
Register custom run facet functions by passing a string of semicolon separated full import paths.
- Type:
string
- Default:
''- Environment Variable:
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS- Example:
full.path.to.custom_facet_function;full.path.to.another_custom_facet_function
dag_state_change_process_pool_size¶
Added in version 1.8.0.
Number of processes to utilize for processing DAG state changes in an asynchronous manner within the scheduler process.
- Type:
integer
- Default:
1- Environment Variable:
AIRFLOW__OPENLINEAGE__DAG_STATE_CHANGE_PROCESS_POOL_SIZE
debug_mode¶
Added in version 1.11.0.
If true, OpenLineage events will include information useful for debugging - potentially containing large fields e.g. all installed packages and their versions.
- Type:
boolean
- Default:
False- Environment Variable:
AIRFLOW__OPENLINEAGE__DEBUG_MODE
disable_source_code¶
Disable the inclusion of source code in OpenLineage events by setting this to true. By default, several Operators (e.g. Python, Bash) will include their source code in the events unless disabled.
- Type:
boolean
- Default:
False- Environment Variable:
AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE
disabled¶
Disable sending events without uninstalling the OpenLineage Provider by setting this to true.
- Type:
boolean
- Default:
False- Environment Variable:
AIRFLOW__OPENLINEAGE__DISABLED
disabled_for_operators¶
Added in version 1.1.0.
Exclude some Operators from emitting OpenLineage events by passing a string of semicolon separated full import paths of Operators to disable.
- Type:
string
- Default:
''- Environment Variable:
AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS- Example:
airflow.providers.standard.operators.bash.BashOperator; airflow.providers.standard.operators.python.PythonOperator
execution_timeout¶
Added in version 1.9.0.
Maximum amount of time (in seconds) that OpenLineage can spend executing metadata extraction for task (on worker). Note that other configurations, sometimes with higher priority, such as [core] task_success_overtime, may also affect how much time OpenLineage has for execution.
- Type:
integer
- Default:
10- Environment Variable:
AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT
extractors¶
Register custom OpenLineage Extractors by passing a string of semicolon separated full import paths.
- Type:
string
- Default:
None- Environment Variable:
AIRFLOW__OPENLINEAGE__EXTRACTORS- Example:
full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
include_full_task_info¶
Added in version 1.10.0.
If true, OpenLineage task events include full serialized task (operator) information. By default, the AirflowRunFacet attached to task events contains only a selected subset of task parameters. With this flag on, all serializable task parameters are sent (excluding known non-serializable elements), which may significantly increase event size.
Warning: By setting this variable to true, OpenLineage event can potentially include elements that are megabytes in size or larger, depending on the size of data you pass to the task.
- Type:
boolean
- Default:
False- Environment Variable:
AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO
namespace¶
Set namespace that the lineage data belongs to, so that if you use multiple OpenLineage producers, events coming from them will be logically separated.
- Type:
string
- Default:
None- Environment Variable:
AIRFLOW__OPENLINEAGE__NAMESPACE- Example:
my_airflow_instance_1
selective_enable¶
Added in version 1.7.0.
If this setting is enabled, OpenLineage integration won’t collect and emit metadata, unless you explicitly enable it per DAG or Task using enable_lineage method.
- Type:
boolean
- Default:
False- Environment Variable:
AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE
spark_inject_parent_job_info¶
Added in version 2.0.0.
Automatically inject OpenLineage’s parent job (namespace, job name, run id) information into Spark application properties for supported Operators.
- Type:
boolean
- Default:
False- Environment Variable:
AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO
spark_inject_transport_info¶
Added in version 2.1.0.
Automatically inject OpenLineage’s transport information into Spark application properties for supported Operators.
- Type:
boolean
- Default:
False- Environment Variable:
AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO
transport¶
Pass OpenLineage Client transport configuration as a JSON string, including the transport type and any additional options specific to that type, as described in OpenLineage docs.
- Type:
string
- Default:
''- Environment Variable:
AIRFLOW__OPENLINEAGE__TRANSPORT- Example:
{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}
Highlighted configurations¶
Transport setup¶
At minimum, one thing that needs to be set up for OpenLineage to function is Transport - where do you wish for
your events to end up - for example Marquez.
Transport as JSON string¶
The transport option in OpenLineage section of Airflow configuration is used for that purpose.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
AIRFLOW__OPENLINEAGE__TRANSPORT environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
If you want to look at OpenLineage events without sending them anywhere, you can set up ConsoleTransport - the events will end up in task logs.
[openlineage]
transport = {"type": "console"}
Note
For full list of built-in transport types, specific transport’s options or instructions on how to implement your custom transport, refer to Python client documentation.
Transport as config file¶
You can also configure OpenLineage Transport using a YAML file (f.e. openlineage.yml).
Provide the path to the YAML file as config_path option in Airflow configuration.
[openlineage]
config_path = '/path/to/openlineage.yml'
AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__CONFIG_PATH='/path/to/openlineage.yml'
Example content of config YAML file:
transport:
type: http
url: https://backend:5000
endpoint: events/receive
auth:
type: api_key
apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e
Note
Detailed description, together with example config files, can be found in Python client documentation.
Configuration precedence¶
Primary, and recommended method of configuring OpenLineage Airflow Provider is Airflow configuration. As there are multiple possible ways of configuring OpenLineage, it’s important to keep in mind the precedence of different configurations. OpenLineage Airflow Provider looks for the configuration in the following order:
Check
config_pathinairflow.cfgunderopenlineagesection (or AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable)Check
transportinairflow.cfgunderopenlineagesection (or AIRFLOW__OPENLINEAGE__TRANSPORT environment variable)If all the above options are missing, the OpenLineage Python client used underneath looks for configuration in the order described in this documentation. Please note that using Airflow configuration is encouraged and is the only future proof solution.
Enabling OpenLineage on Dag/task level¶
One can selectively enable OpenLineage for specific Dags and tasks by using the selective_enable policy.
To enable this policy, set the selective_enable option to True in the [openlineage] section of your Airflow configuration file:
[openlineage]
selective_enable = True
AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true
While selective_enable enables selective control, the disabled option still has precedence.
If you set disabled to True in the configuration, OpenLineage will be disabled for all Dags and tasks regardless of the selective_enable setting.
Once the selective_enable policy is enabled, you can choose to enable OpenLineage
for individual Dags and tasks using the enable_lineage and disable_lineage functions.
Enabling Lineage on a Dag:
from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage
with enable_lineage(Dag(...)):
# Tasks within this Dag will have lineage tracking enabled
MyOperator(...)
AnotherOperator(...)
Enabling Lineage on a Task:
While enabling lineage on a Dag implicitly enables it for all tasks within that Dag, you can still selectively disable it for specific tasks:
from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage
with DAG(...) as dag:
t1 = MyOperator(...)
t2 = AnotherOperator(...)
# Enable lineage for the entire Dag
enable_lineage(dag)
# Disable lineage for task t1
disable_lineage(t1)
Enabling lineage on the Dag level automatically enables it for all tasks within that Dag unless explicitly disabled per task.
Enabling lineage on the task level implicitly enables lineage on its Dag. This is because each emitting task sends a ParentRunFacet, which requires the Dag-level lineage to be enabled in some OpenLineage backend systems. Disabling Dag-level lineage while enabling task-level lineage might cause errors or inconsistencies.
Custom Facets¶
To learn more about facets in OpenLineage, please refer to facet documentation.
The OpenLineage spec might not contain all the facets you need to write your extractor, in which case you will have to make your own custom facets.
You can also inject your own custom facets in the lineage event’s run facet using the custom_run_facets Airflow configuration.
Steps to be taken,
Write a function that returns the custom facets. You can write as many custom facet functions as needed.
Register the functions using the
custom_run_facetsAirflow configuration.
Airflow OpenLineage listener will automatically execute these functions during the lineage event generation and append their return values to the run facet in the lineage event.
Writing a custom facet function¶
Input arguments: The function should accept two input arguments:
TaskInstanceandTaskInstanceState.Function body: Perform the logic needed to generate the custom facets. The custom facets must inherit from the
RunFacetfor the_producerand_schemaURLto be automatically added for the facet.Return value: The custom facets to be added to the lineage event. Return type should be
dict[str, RunFacet]orNone. You may choose to returnNone, if you do not want to add custom facets for certain criteria.
Example custom facet function
import attrs
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
from airflow.providers.common.compat.openlineage.facet import RunFacet
@attrs.define
class MyCustomRunFacet(RunFacet):
"""Define a custom facet."""
name: str
jobState: str
uniqueName: str
displayName: str
dagId: str
taskId: str
cluster: str
custom_metadata: dict
def get_my_custom_facet(
task_instance: TaskInstance, ti_state: TaskInstanceState
) -> dict[str, RunFacet] | None:
operator_name = task_instance.task.operator_name
custom_metadata = {}
if operator_name == "BashOperator":
return None
if ti_state == TaskInstanceState.FAILED:
custom_metadata["custom_key_failed"] = "custom_value"
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
custom_metadata=custom_metadata,
)
}
Register the custom facet functions¶
Use the custom_run_facets Airflow configuration to register the custom run facet functions by passing
a string of semicolon separated full import path to the functions.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
Note
The custom facet functions are executed both at the START and COMPLETE/FAIL of the TaskInstance and added to the corresponding OpenLineage event.
When creating conditions on TaskInstance state, you should use second argument provided (
TaskInstanceState) that will contain the state the task should be in. This may vary from ti.current_state() as the OpenLineage listener may get called before the TaskInstance’s state is updated in Airflow database.When path to a single function is registered more than once, it will still be executed only once.
When duplicate custom facet keys are returned by multiple functions registered, the result of random function result will be added to the lineage event. Please avoid using duplicate facet keys as it can produce unexpected behaviour.
Backwards compatibility¶
Warning
Below variables should not be used and can be removed in the future. Consider using Airflow configuration (described above) for a future proof solution.
For backwards compatibility with openlineage-airflow package, some environment variables are still available:
OPENLINEAGE_DISABLEDis an equivalent ofAIRFLOW__OPENLINEAGE__DISABLED.OPENLINEAGE_CONFIGis an equivalent ofAIRFLOW__OPENLINEAGE__CONFIG_PATH.OPENLINEAGE_NAMESPACEis an equivalent ofAIRFLOW__OPENLINEAGE__NAMESPACE.OPENLINEAGE_EXTRACTORSis an equivalent of settingAIRFLOW__OPENLINEAGE__EXTRACTORS.OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODEis an equivalent ofAIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE.OPENLINEAGE_URLcan be used to set up simple http transport. This method has some limitations and may require using other environment variables to achieve desired output. See docs.