在数据工作流中设置 SMTP 服务器

注意

数据工作流由 Apache Airflow 提供支持。
Apache Airflow 是一个开放源代码平台,用于以编程方式创建、计划和监视复杂的数据工作流。 它允许定义一组称为运算器的任务,这些任务可以组合成有向无环图 (DAG) 以表示数据管道。

Airflow 的一项功能是当任务失败、成功或重试时发送电子邮件通知和警报。 此功能可帮助你跟踪工作流并排查任何问题。

若要使用电子邮件通知和警报,需要设置可以代表 Airflow 发送电子邮件的(简单邮件传输协议)SMTP 服务器。 SMTP 代表简单邮件传输协议,它是通过 Internet 发送和接收电子邮件的标准。 可以使用自己的 SMTP 服务器,或使用第三方服务,例如 Gmail、SendGrid 或 Mailgun。 本文介绍如何使用 Gmail 通过数据工作流来设置 SMTP 服务器。 若要使用电子邮件通知和警报,需要设置可以代表 Airflow 发送电子邮件的(简单邮件传输协议)SMTP 服务器。 SMTP 代表简单邮件传输协议,它是通过 Internet 发送和接收电子邮件的标准。 可以使用自己的 SMTP 服务器,或使用第三方服务,例如 Gmail、SendGrid 或 Mailgun。 本文介绍如何使用 Gmail 通过数据工作流来设置 SMTP 服务器。

先决条件

  • 可用于发送电子邮件的 SMTP 服务器或服务。 需要服务器或服务的 SMTP 主机、端口、用户名和密码。 如果使用 Gmail,请为帐户创建应用密码。

  • 要用作通知和警报发件人的电子邮件地址。 此电子邮件地址可以与 SMTP 用户名相同,如果 SMTP 服务允许,该电子邮件地址也可以是其他电子邮件地址。

  • 想要接收通知和警报的一个或多个电子邮件地址。 这些电子邮件地址可以是你自己的电子邮件地址,也可以是团队成员、利益干系人或客户的电子邮件地址。

设置环境变量

  • 满足先决条件后,可以将数据工作流配置为使用 SMTP 服务器或服务。 编辑 Airflow configurations 部分中的以下字段:

    • AIRFLOW__SMTP__SMTP_HOST:SMTP 服务器或服务的主机名或 IP 地址。
    • AIRFLOW__SMTP__SMTP_STARTTLS:连接到 SMTP 服务器或服务时是否使用 TLS(传输层安全性)加密。 如果 SMTP 服务器或服务支持 TLS,则将此配置设置为 True,否则为 False。
    • AIRFLOW__SMTP__SMTP_SSL:连接到 SMTP 服务器或服务时是否使用 SSL(安全套接字层)加密。 如果 SMTP 服务器或服务需要 SSL,则将此配置设置为 True,否则为 False。
    • AIRFLOW__SMTP__SMTP_USER:SMTP 服务器或服务的用户名。 此用户名通常是电子邮件地址,如果使用 SendGrid,则为 API 密钥。
    • AIRFLOW__SMTP__SMTP_PASSWORD:SMTP 服务器或服务的密码。 此密码通常是电子邮件密码,如果使用 Gmail,则为应用密码。
    • AIRFLOW__SMTP__SMTP_PORT:SMTP 服务器或服务的端口号。 此端口通常为 25、465 或 587,具体取决于加密方法和 SMTP 服务。
    • AIRFLOW__SMTP__SMTP_MAIL_FROM:要用作通知和警报发件人的电子邮件地址。 此电子邮件地址可以与 SMTP 用户名相同,如果 SMTP 服务允许,该电子邮件地址也可以是其他电子邮件地址。

    如果使用 Gmail,请参阅以下值。

    Airflow 配置 Gmail
    AIRFLOW__SMTP__SMTP_HOST smtp.gmail.com
    AIRFLOW__SMTP__SMTP_STARTTLS True
    AIRFLOW__SMTP__SMTP_SSL False
    AIRFLOW__SMTP__SMTP_USER your_email@gmail.com
    AIRFLOW__SMTP__SMTP_PASSWORD your_app_password
    AIRFLOW__SMTP__SMTP_PORT 587
    AIRFLOW__SMTP__SMTP_MAIL_FROM your_email@gmail.com

    屏幕截图显示了 SMTP 的 Airflow 配置。

示例:在 DAG 失败时发送电子邮件的 DAG

   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.utils.email import send_emailfrom datetime import datetime


   default_args = {
   "owner": "airflow",
   "start_date": datetime(2024, 3, 1),
   "email": ["your_email@gmail.com"], # The email address that you want to receive the notifications and alerts
   "email_on_failure": True,
   "email_on_retry": False,
   "retries": 0
   }

   with DAG(
       "email_callback_test",
       default_args=default_args,
       schedule_interval=None
   ) as dag:

   fail_task = BashOperator(
           task_id="fail_task",
           bash_command="cd fail",
       )

   fail_task

从 SMTP 服务器接收的示例电子邮件

电子邮件中包含以下信息:

  • 尝试次数:0

  • 错误

  • 日志:重定向到失败的任务日志的链接。

  • 主机:数据工作流的主机名

  • 主机:数据工作流的主机名

  • 标记成功:重定向到失败的 DAG 状态的链接。

    该屏幕截图显示了按要求添加的专用包。

快速入门:创建数据工作流