Serialization¶
To support data exchange, like arguments, between tasks, Airflow needs to serialize the data to be exchanged and deserialize it again when required in a downstream task. Serialization also happens so that the webserver and the scheduler (as opposed to the DAG processor) do no need to read the DAG file. This is done for security purposes and efficiency.
Serialization is a surprisingly hard job. Python out of the box only has support for serialization of primitives,
like str
and int
and it loops over iterables. When things become more complex, custom serialization is required.
Airflow out of the box supports three ways of custom serialization. Primitives are returned as is, without
additional encoding, e.g. a str
remains a str
. When it is not a primitive (or iterable thereof) Airflow
looks for a registered serializer and deserializer in the namespace of airflow.serialization.serializers
.
If not found it will look in the class for a serialize()
method or in case of deserialization a
deserialize(data, version: int)
method. Finally, if the class is either decorated with @dataclass
or @attr.define
it will use the public methods for those decorators.
If you are looking to extend Airflow with a new serializer, it is good to know when to choose what way of serialization.
Objects that are under the control of Airflow, i.e. residing under the namespace of airflow.*
like
airflow.model.dag.DAG
or under control of the developer e.g. my.company.Foo
should first be examined to see
whether they can be decorated with @attr.define
or @dataclass
. If that is not possible then the serialize
and deserialize
methods should be implemented. The serialize
method should return a primitive or a dict.
It does not need to serialize the values in the dict, that will be taken care of, but the keys should be of a primitive
form.
Objects that are not under control of Airflow, e.g. numpy.int16
will need a registered serializer and deserializer.
Versioning is required. Primitives, excluding bytes
, can be returned as dicts. Again dict
values do not need to be serialized,
but its keys need to be of primitive form. In case you are implementing a registered serializer, take special care
not to have circular imports. Typically, this can be avoided by using str
for populating the list of serializers.
Like so: serializers = ["my.company.Foo"]
instead of serializers = [Foo]
.
Note
Serialization and deserialization is dependent on speed. Use built-in functions like dict
as much as you can and stay away from using classes and other complex structures.
Airflow Object¶
from typing import Any, ClassVar
class Foo:
__version__: ClassVar[int] = 1
def __init__(self, a, v) -> None:
self.a = a
self.b = {"x": v}
def serialize(self) -> dict[str, Any]:
return {
"a": self.a,
"b": self.b,
}
@staticmethod
def deserialize(data: dict[str, Any], version: int):
f = Foo(a=data["a"])
f.b = data["b"]
return f
Registered¶
from __future__ import annotations
from decimal import Decimal
from typing import TYPE_CHECKING
from airflow.utils.module_loading import qualname
if TYPE_CHECKING:
from airflow.serialization.serde import U
serializers = [
Decimal
] # this can be a type or a fully qualified str. Str can be used to prevent circular imports
deserializers = serializers # in some cases you might not have a deserializer (e.g. k8s pod)
__version__ = 1 # required
# the serializer expects output, classname, version, is_serialized?
def serialize(o: object) -> tuple[U, str, int, bool]:
if isinstance(o, Decimal):
name = qualname(o)
_, _, exponent = o.as_tuple()
if exponent >= 0: # No digits after the decimal point.
return int(o), name, __version__, True
# Technically lossy due to floating point errors, but the best we
# can do without implementing a custom encode function.
return float(o), name, __version__, True
return "", "", 0, False
# the deserializer sanitizes the data for you, so you do not need to deserialize values yourself
def deserialize(classname: str, version: int, data: object) -> Decimal:
# always check version compatibility
if version > __version__:
raise TypeError(f"serialized {version} of {classname} > {__version__}")
if classname != qualname(Decimal):
raise TypeError(f"{classname} != {qualname(Decimal)}")
return Decimal(str(data))