Source code for airflow.providers.google.cloud.utils.field_validator
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Validator for body fields sent via Google Cloud API.The validator performs validation of the body (being dictionary of fields) thatis sent in the API request to Google Cloud (via ``googleclient`` API usually).Context-------The specification mostly focuses on helping Airflow DAG developers in the developmentphase. You can build your own Google Cloud operator (such as GcfDeployOperator for example) whichcan have built-in validation specification for the particular API. It's super helpfulwhen developer plays with different fields and their values at the initial phase ofDAG development. Most of the Google Cloud APIs perform their own validation on theserver side, but most of the requests are asynchronous and you need to wait for resultof the operation. This takes precious times and slowsdown iteration over the API. BodyFieldValidator is meant to be used on the client sideand it should therefore provide an instant feedback to the developer on misspelled orwrong type of parameters.The validation should be performed in "execute()" method call in order to allowtemplate parameters to be expanded before validation is performed.Types of fields---------------Specification is an array of dictionaries - each dictionary describes field, its type,validation, optionality, api_version supported and nested fields (for unions and dicts).Typically (for clarity and in order to aid syntax highlighting) the array ofdicts should be defined as series of dict() executions. Fragment of examplespecification might look as follows:: SPECIFICATION =[ dict(name="an_union", type="union", optional=True, fields=[ dict(name="variant_1", type="dict"), dict(name="variant_2", regexp=r'^.+$', api_version='v1beta2'), ), dict(name="an_union", type="dict", fields=[ dict(name="field_1", type="dict"), dict(name="field_2", regexp=r'^.+$'), ), ... ]Each field should have key = "name" indicating field name. The field can be of one of thefollowing types:* Dict fields: (key = "type", value="dict"): Field of this type should contain nested fields in form of an array of dicts. Each of the fields in the array is then expected (unless marked as optional) and validated recursively. If an extra field is present in the dictionary, warning is printed in log file (but the validation succeeds - see the Forward-compatibility notes)* List fields: (key = "type", value="list"): Field of this type should be a list. Only the type correctness is validated. The contents of a list are not subject to validation.* Union fields (key = "type", value="union"): field of this type should contain nested fields in form of an array of dicts. One of the fields (and only one) should be present (unless the union is marked as optional). If more than one union field is present, FieldValidationException is raised. If none of the union fields is present - warning is printed in the log (see below Forward-compatibility notes).* Fields validated for non-emptiness: (key = "allow_empty") - this applies only to fields the value of which is a string, and it allows to check for non-emptiness of the field (allow_empty=False).* Regexp-validated fields: (key = "regexp") - fields of this type are assumed to be strings and they are validated with the regexp specified. Remember that the regexps should ideally contain ^ at the beginning and $ at the end to make sure that the whole field content is validated. Typically such regexp validations should be used carefully and sparingly (see Forward-compatibility notes below).* Custom-validated fields: (key = "custom_validation") - fields of this type are validated using method specified via custom_validation field. Any exception thrown in the custom validation will be turned into FieldValidationException and will cause validation to fail. Such custom validations might be used to check numeric fields (including ranges of values), booleans or any other types of fields.* API version: (key="api_version") if API version is specified, then the field will only be validated when api_version used at field validator initialization matches exactly the version specified. If you want to declare fields that are available in several versions of the APIs, you should specify the field as many times as many API versions should be supported (each time with different API version).* if none of the keys ("type", "regexp", "custom_validation" - the field is not validatedYou can see some of the field examples in EXAMPLE_VALIDATION_SPECIFICATION.Forward-compatibility notes---------------------------Certain decisions are crucial to allow the client APIs to work also with future APIversions. Since body attached is passed to the API's call, this is entirelypossible to pass-through any new fields in the body (for future API versions) -albeit without validation on the client side - they can and will still be validatedon the server side usually.Here are the guidelines that you should follow to make validation forward-compatible:* most of the fields are not validated for their content. It's possible to use regexp in some specific cases that are guaranteed not to change in the future, but for most fields regexp validation should be r'^.+$' indicating check for non-emptiness* api_version is not validated - user can pass any future version of the api here. The API version is only used to filter parameters that are marked as present in this api version any new (not present in the specification) fields in the body are allowed (not verified) For dictionaries, new fields can be added to dictionaries by future calls. However if an unknown field in dictionary is added, a warning is logged by the client (but validation remains successful). This is very nice feature to protect against typos in names.* For unions, newly added union variants can be added by future calls and they will pass validation, however the content or presence of those fields will not be validated. This means that it's possible to send a new non-validated union field together with an old validated field and this problem will not be detected by the client. In such case warning will be printed.* When you add validator to an operator, you should also add ``validate_body`` parameter (default = True) to __init__ of such operators - when it is set to False, no validation should be performed. This is a safeguard for totally unpredicted and backwards-incompatible changes that might sometimes occur in the APIs."""from__future__importannotationsimportrefromcollections.abcimportSequencefromtypingimportCallablefromairflow.exceptionsimportAirflowExceptionfromairflow.utils.log.logging_mixinimportLoggingMixin
[docs]classGcpFieldValidationException(AirflowException):"""Thrown when validation finds dictionary field not valid according to specification."""
[docs]classGcpValidationSpecificationException(AirflowException):""" Thrown when validation specification is wrong. This should only happen during development as ideally specification itself should not be invalid ;) . """
def_int_greater_than_zero(value):ifint(value)<=0:raiseGcpFieldValidationException("The available memory has to be greater than 0")
[docs]classGcpBodyFieldValidator(LoggingMixin):""" Validates correctness of request body according to specification. The specification can describe various type of fields including custom validation, and union of fields. This validator is to be reusable by various operators. See the EXAMPLE_VALIDATION_SPECIFICATION for some examples and explanations of how to create specification. :param validation_specs: dictionary describing validation specification :param api_version: Version of the api used (for example v1) """def__init__(self,validation_specs:Sequence[dict],api_version:str)->None:super().__init__()self._validation_specs=validation_specsself._api_version=api_version@staticmethoddef_get_field_name_with_parent(field_name,parent):ifparent:returnparent+"."+field_namereturnfield_name@staticmethoddef_sanity_checks(children_validation_specs:dict,field_type:str,full_field_path:str,regexp:str,allow_empty:bool,custom_validation:Callable|None,value,)->None:ifvalueisNoneandfield_type!="union":raiseGcpFieldValidationException(f"The required body field '{full_field_path}' is missing. Please add it.")ifregexpandfield_type:raiseGcpValidationSpecificationException(f"The validation specification entry '{full_field_path}' has both type and regexp. ""The regexp is only allowed without type (i.e. assume type is 'str' that can be ""validated with regexp)")ifallow_emptyisnotNoneandfield_type:raiseGcpValidationSpecificationException(f"The validation specification entry '{full_field_path}' has both type and allow_empty. ""The allow_empty is only allowed without type (i.e. assume type is 'str' that can ""be validated with allow_empty)")ifchildren_validation_specsandfield_typenotinCOMPOSITE_FIELD_TYPES:raiseGcpValidationSpecificationException(f"Nested fields are specified in field '{full_field_path}' of type '{field_type}'. "f"Nested fields are only allowed for fields of those types: ('{COMPOSITE_FIELD_TYPES}').")ifcustom_validationandfield_type:raiseGcpValidationSpecificationException(f"The validation specification field '{full_field_path}' has both type and "f"custom_validation. Custom validation is only allowed without type.")@staticmethoddef_validate_regexp(full_field_path:str,regexp:str,value:str)->None:ifnotre.match(regexp,value):# Note matching of only the beginning as we assume the regexps all-or-nothingraiseGcpFieldValidationException(f"The body field '{full_field_path}' of value '{value}' does not match the field "f"specification regexp: '{regexp}'.")@staticmethoddef_validate_is_empty(full_field_path:str,value:str)->None:ifnotvalue:raiseGcpFieldValidationException(f"The body field '{full_field_path}' can't be empty. Please provide a value.")def_validate_dict(self,children_validation_specs:dict,full_field_path:str,value:dict)->None:forchild_validation_specinchildren_validation_specs:self._validate_field(validation_spec=child_validation_spec,dictionary_to_validate=value,parent=full_field_path)all_dict_keys={spec["name"]forspecinchildren_validation_specs}forfield_nameinvalue:iffield_namenotinall_dict_keys:self.log.warning("The field '%s' is in the body, but is not specified in the ""validation specification '%s'. ""This might be because you are using newer API version and ""new field names defined for that version. Then the warning ""can be safely ignored, or you might want to upgrade the operator""to the version that supports the new API version.",self._get_field_name_with_parent(field_name,full_field_path),children_validation_specs,)def_validate_union(self,children_validation_specs:dict,full_field_path:str,dictionary_to_validate:dict)->None:field_found=Falsefound_field_name=Noneforchild_validation_specinchildren_validation_specs:# Forcing optional so that we do not have to type optional = True# in specification for all union fieldsnew_field_found=self._validate_field(validation_spec=child_validation_spec,dictionary_to_validate=dictionary_to_validate,parent=full_field_path,force_optional=True,)field_name=child_validation_spec["name"]ifnew_field_foundandfield_found:raiseGcpFieldValidationException(f"The mutually exclusive fields '{field_name}' and '{found_field_name}' belonging to "f"the union '{full_field_path}' are both present. Please remove one")ifnew_field_found:field_found=Truefound_field_name=field_nameifnotfield_found:self.log.warning("There is no '%s' union defined in the body %s. ""Validation expected one of '%s' but could not find any. It's possible ""that you are using newer API version and there is another union variant ""defined for that version. Then the warning can be safely ignored, ""or you might want to upgrade the operator to the version that ""supports the new API version.",full_field_path,dictionary_to_validate,[field["name"]forfieldinchildren_validation_specs],)def_validate_field(self,validation_spec,dictionary_to_validate,parent=None,force_optional=False):""" Validate if field is OK. :param validation_spec: specification of the field :param dictionary_to_validate: dictionary where the field should be present :param parent: full path of parent field :param force_optional: forces the field to be optional (all union fields have force_optional set to True) :return: True if the field is present """field_name=validation_spec["name"]field_type=validation_spec.get("type")optional=validation_spec.get("optional")regexp=validation_spec.get("regexp")allow_empty=validation_spec.get("allow_empty")children_validation_specs=validation_spec.get("fields")required_api_version=validation_spec.get("api_version")custom_validation=validation_spec.get("custom_validation")full_field_path=self._get_field_name_with_parent(field_name=field_name,parent=parent)ifrequired_api_versionandrequired_api_version!=self._api_version:self.log.debug("Skipping validation of the field '%s' for API version '%s' ""as it is only valid for API version '%s'",field_name,self._api_version,required_api_version,)returnFalsevalue=dictionary_to_validate.get(field_name)if(optionalorforce_optional)andvalueisNone:self.log.debug("The optional field '%s' is missing. That's perfectly OK.",full_field_path)returnFalse# Certainly down from here the field is present (value is not None)# so we should only return True from now onself._sanity_checks(children_validation_specs=children_validation_specs,field_type=field_type,full_field_path=full_field_path,regexp=regexp,allow_empty=allow_empty,custom_validation=custom_validation,value=value,)ifallow_emptyisFalse:self._validate_is_empty(full_field_path,value)ifregexp:self._validate_regexp(full_field_path,regexp,value)eliffield_type=="dict":ifnotisinstance(value,dict):raiseGcpFieldValidationException(f"The field '{full_field_path}' should be of dictionary type according to "f"the specification '{validation_spec}' but it is '{value}'")ifchildren_validation_specsisNone:self.log.debug("The dict field '%s' has no nested fields defined in the ""specification '%s'. That's perfectly ok - it's content will ""not be validated.",full_field_path,validation_spec,)else:self._validate_dict(children_validation_specs,full_field_path,value)eliffield_type=="union":ifnotchildren_validation_specs:raiseGcpValidationSpecificationException(f"The union field '{full_field_path}' has no nested fields defined in "f"specification '{validation_spec}'. ""Unions should have at least one nested field defined.")self._validate_union(children_validation_specs,full_field_path,dictionary_to_validate)eliffield_type=="list":ifnotisinstance(value,list):raiseGcpFieldValidationException(f"The field '{full_field_path}' should be of list type according to "f"the specification '{validation_spec}' but it is '{value}'")elifcustom_validation:try:custom_validation(value)exceptExceptionase:raiseGcpFieldValidationException(f"Error while validating custom field '{full_field_path}' "f"specified by '{validation_spec}': '{e}'")eliffield_typeisNone:self.log.debug("The type of field '%s' is not specified in '%s'. Not validating its content.",full_field_path,validation_spec,)else:raiseGcpValidationSpecificationException(f"The field '{full_field_path}' is of type '{field_type}' in "f"specification '{validation_spec}'.This type is unknown to validation!")returnTrue
[docs]defvalidate(self,body_to_validate:dict)->None:""" Validate if the body (dictionary) follows specification that the validator was instantiated with. Raises ValidationSpecificationException or ValidationFieldException in case of problems with specification or the body not conforming to the specification respectively. :param body_to_validate: body that must follow the specification :return: None """ifbody_to_validateisNone:raiseRuntimeError("The body to validate is `None`. Please provide a dictionary to validate.")try:forvalidation_specinself._validation_specs:self._validate_field(validation_spec=validation_spec,dictionary_to_validate=body_to_validate)exceptGcpFieldValidationExceptionase:raiseGcpFieldValidationException(f"There was an error when validating: body '{body_to_validate}': '{e}'")all_field_names={spec["name"]forspecinself._validation_specsifspec.get("type")!="union"andspec.get("api_version")!=self._api_version}all_union_fields=[specforspecinself._validation_specsifspec.get("type")=="union"]forunion_fieldinall_union_fields:all_field_names.update(nested_union_spec["name"]fornested_union_specinunion_field["fields"]ifnested_union_spec.get("type")!="union"andnested_union_spec.get("api_version")!=self._api_version)forfield_nameinbody_to_validate:iffield_namenotinall_field_names:self.log.warning("The field '%s' is in the body, but is not specified in the ""validation specification '%s'. ""This might be because you are using newer API version and ""new field names defined for that version. Then the warning ""can be safely ignored, or you might want to upgrade the operator""to the version that supports the new API version.",field_name,self._validation_specs,)