Source code for airflow.providers.ydb.hooks._vendor.dbapi

from .connection import AsyncConnection, Connection, IsolationLevel  # noqa: F401
from .cursor import AsyncCursor, Cursor, YdbQuery  # noqa: F401
from .errors import (
    DatabaseError,
    DataError,
    Error,
    IntegrityError,
    InterfaceError,
    InternalError,
    NotSupportedError,
    OperationalError,
    ProgrammingError,
    Warning,
)


[docs]class YdbDBApi: def __init__(self): self.paramstyle = "pyformat" self.threadsafety = 0 self.apilevel = "1.0" self._init_dbapi_attributes() def _init_dbapi_attributes(self): for name, value in { "Warning": Warning, "Error": Error, "InterfaceError": InterfaceError, "DatabaseError": DatabaseError, "DataError": DataError, "OperationalError": OperationalError, "IntegrityError": IntegrityError, "InternalError": InternalError, "ProgrammingError": ProgrammingError, "NotSupportedError": NotSupportedError, }.items(): setattr(self, name, value)
[docs] def connect(self, *args, **kwargs) -> Connection: return Connection(*args, **kwargs)
[docs] def async_connect(self, *args, **kwargs) -> AsyncConnection: return AsyncConnection(*args, **kwargs)

Was this entry helpful?