Source code for airflow.providers.ydb.hooks._vendor.dbapi.errors
from typing import List, Optional
import ydb
from google.protobuf.message import Message
[docs]class Warning(Exception):
pass
[docs]class Error(Exception):
def __init__(
self,
message: str,
original_error: Optional[ydb.Error] = None,
):
super(Error, self).__init__(message)
self.original_error = original_error
if original_error:
pretty_issues = _pretty_issues(original_error.issues)
self.issues = original_error.issues
self.message = pretty_issues or message
self.status = original_error.status
[docs]class InterfaceError(Error):
pass
[docs]class DatabaseError(Error):
pass
[docs]class DataError(DatabaseError):
pass
[docs]class OperationalError(DatabaseError):
pass
[docs]class IntegrityError(DatabaseError):
pass
[docs]class InternalError(DatabaseError):
pass
[docs]class ProgrammingError(DatabaseError):
pass
[docs]class NotSupportedError(DatabaseError):
pass
def _pretty_issues(issues: List[Message]) -> str:
if issues is None:
return None
children_messages = [_get_messages(issue, root=True) for issue in issues]
if None in children_messages:
return None
return "\n" + "\n".join(children_messages)
def _get_messages(issue: Message, max_depth: int = 100, indent: int = 2, depth: int = 0, root: bool = False) -> str:
if depth >= max_depth:
return None
margin_str = " " * depth * indent
pre_message = ""
children = ""
if issue.issues:
collapsed_messages = []
while not root and len(issue.issues) == 1:
collapsed_messages.append(issue.message)
issue = issue.issues[0]
if collapsed_messages:
pre_message = f"{margin_str}{', '.join(collapsed_messages)}\n"
depth += 1
margin_str = " " * depth * indent
children_messages = [
_get_messages(iss, max_depth=max_depth, indent=indent, depth=depth + 1) for iss in issue.issues
]
if None in children_messages:
return None
children = "\n".join(children_messages)
return (
f"{pre_message}{margin_str}{issue.message}\n{margin_str}"
f"severity level: {issue.severity}\n{margin_str}"
f"issue code: {issue.issue_code}\n{children}"
)