Source code for airflow.providers.amazon.aws.hooks.elasticache_replication_group
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttimefromairflow.exceptionsimportAirflowExceptionfromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHook
[docs]classElastiCacheReplicationGroupHook(AwsBaseHook):""" Interact with Amazon ElastiCache. Provide thick wrapper around :external+boto3:py:class:`boto3.client("elasticache") <ElastiCache.Client>`. :param max_retries: Max retries for checking availability of and deleting replication group If this is not supplied then this is defaulted to 10 :param exponential_back_off_factor: Multiplication factor for deciding next sleep time If this is not supplied then this is defaulted to 1 :param initial_poke_interval: Initial sleep time in seconds If this is not supplied then this is defaulted to 60 seconds Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """
[docs]defcreate_replication_group(self,config:dict)->dict:""" Create a Redis (cluster mode disabled) or a Redis (cluster mode enabled) replication group. .. seealso:: - :external+boto3:py:meth:`ElastiCache.Client.create_replication_group` :param config: Configuration for creating the replication group :return: Response from ElastiCache create replication group API """returnself.conn.create_replication_group(**config)
[docs]defdelete_replication_group(self,replication_group_id:str)->dict:""" Delete an existing replication group. .. seealso:: - :external+boto3:py:meth:`ElastiCache.Client.delete_replication_group` :param replication_group_id: ID of replication group to delete :return: Response from ElastiCache delete replication group API """returnself.conn.delete_replication_group(ReplicationGroupId=replication_group_id)
[docs]defdescribe_replication_group(self,replication_group_id:str)->dict:""" Get information about a particular replication group. .. seealso:: - :external+boto3:py:meth:`ElastiCache.Client.describe_replication_groups` :param replication_group_id: ID of replication group to describe :return: Response from ElastiCache describe replication group API """returnself.conn.describe_replication_groups(ReplicationGroupId=replication_group_id)
[docs]defget_replication_group_status(self,replication_group_id:str)->str:""" Get current status of replication group. .. seealso:: - :external+boto3:py:meth:`ElastiCache.Client.describe_replication_groups` :param replication_group_id: ID of replication group to check for status :return: Current status of replication group """returnself.describe_replication_group(replication_group_id)["ReplicationGroups"][0]["Status"]
[docs]defis_replication_group_available(self,replication_group_id:str)->bool:""" Check if replication group is available or not. :param replication_group_id: ID of replication group to check for availability :return: True if available else False """returnself.get_replication_group_status(replication_group_id)=="available"
[docs]defwait_for_availability(self,replication_group_id:str,initial_sleep_time:float|None=None,exponential_back_off_factor:float|None=None,max_retries:int|None=None,)->bool:""" Check if replication group is available or not by performing a describe over it. :param replication_group_id: ID of replication group to check for availability :param initial_sleep_time: Initial sleep time in seconds If this is not supplied then this is defaulted to class level value :param exponential_back_off_factor: Multiplication factor for deciding next sleep time If this is not supplied then this is defaulted to class level value :param max_retries: Max retries for checking availability of replication group If this is not supplied then this is defaulted to class level value :return: True if replication is available else False """sleep_time=initial_sleep_timeorself.initial_poke_intervalexponential_back_off_factor=exponential_back_off_factororself.exponential_back_off_factormax_retries=max_retriesorself.max_retriesnum_tries=0status="not-found"stop_poking=Falsewhilenotstop_pokingandnum_tries<=max_retries:status=self.get_replication_group_status(replication_group_id=replication_group_id)stop_poking=statusinself.TERMINAL_STATESself.log.info("Current status of replication group with ID %s is %s",replication_group_id,status)ifnotstop_poking:num_tries+=1# No point in sleeping if all tries have exhaustedifnum_tries>max_retries:breakself.log.info("Poke retry %s. Sleep time %s seconds. Sleeping...",num_tries,sleep_time)time.sleep(sleep_time)sleep_time*=exponential_back_off_factorifstatus!="available":self.log.warning('Replication group is not available. Current status is "%s"',status)returnFalsereturnTrue
[docs]defwait_for_deletion(self,replication_group_id:str,initial_sleep_time:float|None=None,exponential_back_off_factor:float|None=None,max_retries:int|None=None,):""" Delete a replication group ensuring it is either deleted or can't be deleted. :param replication_group_id: ID of replication to delete :param initial_sleep_time: Initial sleep time in second If this is not supplied then this is defaulted to class level value :param exponential_back_off_factor: Multiplication factor for deciding next sleep time If this is not supplied then this is defaulted to class level value :param max_retries: Max retries for checking availability of replication group If this is not supplied then this is defaulted to class level value :return: Response from ElastiCache delete replication group API and flag to identify if deleted or not """deleted=Falsesleep_time=initial_sleep_timeorself.initial_poke_intervalexponential_back_off_factor=exponential_back_off_factororself.exponential_back_off_factormax_retries=max_retriesorself.max_retriesnum_tries=0response=Nonewhilenotdeletedandnum_tries<=max_retries:try:status=self.get_replication_group_status(replication_group_id=replication_group_id)self.log.info("Current status of replication group with ID %s is %s",replication_group_id,status)# Can only delete if status is `available`# Status becomes `deleting` after this call so this will only run onceifstatus=="available":self.log.info("Initiating delete and then wait for it to finish")response=self.delete_replication_group(replication_group_id=replication_group_id)exceptself.conn.exceptions.ReplicationGroupNotFoundFault:self.log.info("Replication group with ID '%s' does not exist",replication_group_id)deleted=True# This should never occur as we only issue a delete request when status is `available`# which is a valid status for deletion. Still handling for safety.exceptself.conn.exceptions.InvalidReplicationGroupStateFaultasexp:# status Error Response# creating - Cache cluster <cluster_id> is not in a valid state to be deleted.# deleting - Replication group <replication_group_id> has status deleting which is not valid# for deletion.# modifying - Replication group <replication_group_id> has status deleting which is not valid# for deletion.message=exp.response["Error"]["Message"]self.log.warning("Received error message from AWS ElastiCache API : %s",message)ifnotdeleted:num_tries+=1# No point in sleeping if all tries have exhaustedifnum_tries>max_retries:breakself.log.info("Poke retry %s. Sleep time %s seconds. Sleeping...",num_tries,sleep_time)time.sleep(sleep_time)sleep_time*=exponential_back_off_factorreturnresponse,deleted
[docs]defensure_delete_replication_group(self,replication_group_id:str,initial_sleep_time:float|None=None,exponential_back_off_factor:float|None=None,max_retries:int|None=None,)->dict:""" Delete a replication group ensuring it is either deleted or can't be deleted. :param replication_group_id: ID of replication to delete :param initial_sleep_time: Initial sleep time in second If this is not supplied then this is defaulted to class level value :param exponential_back_off_factor: Multiplication factor for deciding next sleep time If this is not supplied then this is defaulted to class level value :param max_retries: Max retries for checking availability of replication group If this is not supplied then this is defaulted to class level value :return: Response from ElastiCache delete replication group API :raises AirflowException: If replication group is not deleted """self.log.info("Deleting replication group with ID %s",replication_group_id)response,deleted=self.wait_for_deletion(replication_group_id=replication_group_id,initial_sleep_time=initial_sleep_time,exponential_back_off_factor=exponential_back_off_factor,max_retries=max_retries,)ifnotdeleted:raiseAirflowException(f'Replication group could not be deleted. Response "{response}"')returnresponse