Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

tests.system.openlineage.operator

Attributes

log

Classes

OpenLineageTestOperator

This operator is added for system testing purposes.

Functions

any(result)

is_datetime(result)

is_uuid(result)

regex_match(result, pattern)

env_var(var[, default])

Use this jinja method to access the environment variable named 'var'.

not_match(result, pattern)

url_scheme_authority(url)

url_path(url)

setup_jinja()

match(expected, result, env)

Check if result is "equal" to expected value.

Module Contents

tests.system.openlineage.operator.log[source]
tests.system.openlineage.operator.any(result)[source]
tests.system.openlineage.operator.is_datetime(result)[source]
tests.system.openlineage.operator.is_uuid(result)[source]
tests.system.openlineage.operator.regex_match(result, pattern)[source]
tests.system.openlineage.operator.env_var(var, default=None)[source]

Use this jinja method to access the environment variable named ‘var’.

If there is no such environment variable set, return the default. If the default is None, raise an exception for an undefined variable.

tests.system.openlineage.operator.not_match(result, pattern)[source]
tests.system.openlineage.operator.url_scheme_authority(url)[source]
tests.system.openlineage.operator.url_path(url)[source]
tests.system.openlineage.operator.setup_jinja()[source]
tests.system.openlineage.operator.match(expected, result, env)[source]

Check if result is “equal” to expected value.

Omits keys not specified in expected value and resolves any jinja templates found.

class tests.system.openlineage.operator.OpenLineageTestOperator(event_templates=None, file_path=None, env=setup_jinja(), allow_duplicate_events=False, clear_variables=True, **kwargs)[source]

Bases: airflow.models.operator.BaseOperator

This operator is added for system testing purposes.

It compares expected event templates set on initialization with ones emitted by OpenLineage integration and stored in Variables by VariableTransport.

Note:

If clear_variables is True, only the Airflow Variables listed in event_templates (or derived from file_path) will be deleted - those that are supposed to be checked by the Operator. We won’t remove all Airflow Variables to avoid interfering with other instances of this Operator running in parallel on different DAGs. Running continuous system tests without clearing Variables may lead to leftover or growing Variables size. We recommend implementing a process to remove all Airflow Variables after all system tests have run to ensure a clean environment for each test run.

Parameters:
  • event_templates (dict[str, dict] | None) – dictionary where key is the key used by VariableTransport in format of <DAG_ID>.<TASK_ID>.event.<EVENT_TYPE>, and value is event template (fragment) that need to be in received events.

  • file_path (str | None) – alternatively, file_path pointing to file with event templates will be used

  • env (jinja2.Environment) – jinja environment used to render event templates

  • allow_duplicate_events (bool) – if set to True, allows multiple events for the same key

  • clear_variables (bool) – if set to True, clears only variables to be checked after all events are checked or if any check fails

Raises:

ValueError if the received events do not match with expected ones.

event_templates = None[source]
file_path = None[source]
env[source]
multiple_events = False[source]
delete = True[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

Was this entry helpful?