#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Airflow module for email backend using sendgrid."""
from __future__ import annotations
import base64
import logging
import mimetypes
import os
import warnings
from typing import Iterable, Union
import sendgrid
from sendgrid.helpers.mail import (
Attachment,
Category,
Content,
CustomArg,
Email,
Mail,
MailSettings,
Personalization,
SandBoxMode,
)
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.hooks.base import BaseHook
from airflow.utils.email import get_email_address_list
[docs]log = logging.getLogger(__name__)
[docs]AddressesType = Union[str, Iterable[str]]
[docs]def send_email(
to: AddressesType,
subject: str,
html_content: str,
files: AddressesType | None = None,
cc: AddressesType | None = None,
bcc: AddressesType | None = None,
sandbox_mode: bool = False,
conn_id: str = "sendgrid_default",
**kwargs,
) -> None:
"""
Send an email with html content using `Sendgrid <https://sendgrid.com/>`__.
.. note::
For more information, see :ref:`email-configuration-sendgrid`
"""
if files is None:
files = []
mail = Mail()
from_email = kwargs.get("from_email") or os.environ.get("SENDGRID_MAIL_FROM")
from_name = kwargs.get("from_name") or os.environ.get("SENDGRID_MAIL_SENDER")
mail.from_email = Email(from_email, from_name)
mail.subject = subject
mail.mail_settings = MailSettings()
if sandbox_mode:
mail.mail_settings.sandbox_mode = SandBoxMode(enable=True)
# Add the recipient list of to emails.
personalization = Personalization()
to = get_email_address_list(to)
for to_address in to:
personalization.add_to(Email(to_address))
if cc:
cc = get_email_address_list(cc)
for cc_address in cc:
personalization.add_cc(Email(cc_address))
if bcc:
bcc = get_email_address_list(bcc)
for bcc_address in bcc:
personalization.add_bcc(Email(bcc_address))
# Add custom_args to personalization if present
pers_custom_args = kwargs.get("personalization_custom_args")
if isinstance(pers_custom_args, dict):
for key, val in pers_custom_args.items():
personalization.add_custom_arg(CustomArg(key, val))
mail.add_personalization(personalization)
mail.add_content(Content("text/html", html_content))
categories = kwargs.get("categories", [])
for cat in categories:
mail.add_category(Category(cat))
# Add email attachment.
for fname in files:
basename = os.path.basename(fname)
with open(fname, "rb") as file:
content = base64.b64encode(file.read()).decode("utf-8")
attachment = Attachment(
file_content=content,
file_type=mimetypes.guess_type(basename)[0],
file_name=basename,
disposition="attachment",
content_id=f"<{basename}>",
)
mail.add_attachment(attachment)
_post_sendgrid_mail(mail.get(), conn_id)
def _post_sendgrid_mail(mail_data: dict, conn_id: str = "sendgrid_default") -> None:
api_key = None
try:
conn = BaseHook.get_connection(conn_id)
api_key = conn.password
except AirflowException:
pass
if api_key is None:
warnings.warn(
"Fetching Sendgrid credentials from environment variables will be deprecated in a future "
"release. Please set credentials using a connection instead.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
api_key = os.environ.get("SENDGRID_API_KEY")
sendgrid_client = sendgrid.SendGridAPIClient(api_key=api_key)
response = sendgrid_client.client.mail.send.post(request_body=mail_data)
# 2xx status code.
if 200 <= response.status_code < 300:
log.info(
"Email with subject %s is successfully sent to recipients: %s",
mail_data["subject"],
mail_data["personalizations"],
)
else:
log.error(
"Failed to send out email with subject %s, status code: %s",
mail_data["subject"],
response.status_code,
)