Руководство. Оператор отправителя электронной почты Gmail

Important

Эта функция доступна в общедоступной предварительной версии.

В этом руководстве пошагово показано, как создать оператор python-run-function для Lakeflow Designer, который отправляет содержимое DataFrame через Gmail в виде CSV-вложения. Используйте этот пример, чтобы узнать, как создавать операторы на основе YAML, которые выполняют побочные эффекты, такие как отправка уведомлений или запись во внешние системы. Дополнительные сведения см. в разделе "Определяемые пользователем операторы" в конструкторе Lakeflow.

Requirements

  • Рабочая область Azure Databricks с доступом к созданию областей секретов.
  • Учетная запись Gmail с паролем Google App (требуется, если включена многофакторная проверка подлинности( MFA).
  • Интерфейс командной строки Databricks, установленный в локальной среде разработки.

Шаг 1. Настройка секретов

Сохраните учетные данные Gmail в области секрета Azure Databricks, чтобы оператор смог получить их во время выполнения.

  1. Создайте область секрета с помощью интерфейса командной строки Azure Databricks:

    databricks secrets create-scope my_email_scope
    
  2. Сохраните пароль приложения Gmail в этой области видимости:

    databricks secrets put-secret my_email_scope gmail_app_password
    

    Вам будет предложено ввести значение секрета. Вставьте пароль приложения Gmail и сохраните его.

Шаг 2: Напишите функцию run()

Для python-run-function типа оператора требуется функция с этой сигнатурой run() :

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config: значения конфигурации, предоставленные пользователем в пользовательском интерфейсе конструктора Lakeflow.
  • inputs: Входные фреймы данных, где ключом является имя порта.
  • spark: активный сеанс Spark.

Функция должна возвращать словарь выходных объектов DataFrame, где ключами являются имена выходных портов.

Определите и проверьте функцию в ячейке записной книжки:

from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
    input_df = inputs["data"]

    # Skip side effects during Designer preview
    if config.get("is_preview", False):
        return {"data": input_df}

    import smtplib
    import os
    from email.mime.multipart import MIMEMultipart
    from email.mime.text import MIMEText
    from email.mime.base import MIMEBase
    from email import encoders

    sender_email = config.get("sender_email", "")
    secret_scope = config.get("secret_scope", "")
    secret_key = config.get("secret_key", "")
    recipients_raw = config.get("recipients", "")
    subject = config.get("subject", "")
    body = config.get("body", "")

    if not sender_email:
        raise ValueError("Sender Email is required.")
    if not secret_scope or not secret_key:
        raise ValueError("Secret Scope and Secret Key are required.")
    if not recipients_raw:
        raise ValueError("At least one recipient is required.")

    recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
    if not recipients:
        raise ValueError("At least one valid recipient email is required.")

    # Retrieve password from Databricks secrets
    from pyspark.dbutils import DBUtils
    dbutils = DBUtils(spark)
    sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

    # Convert DataFrame to CSV
    pdf = input_df.toPandas()
    file_path = "/tmp/designer_email_attachment.csv"
    pdf.to_csv(file_path, index=False)

    # Send email to each recipient
    for recipient in recipients:
        msg = MIMEMultipart()
        msg["From"] = sender_email
        msg["To"] = recipient
        msg["Subject"] = subject
        msg.attach(MIMEText(body, "plain"))

        with open(file_path, "rb") as attachment:
            part = MIMEBase("application", "octet-stream")
            part.set_payload(attachment.read())
            encoders.encode_base64(part)
            part.add_header(
                "Content-Disposition",
                f"attachment; filename={os.path.basename(file_path)}",
            )
            msg.attach(part)

        with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
            server.login(sender_email, sender_password)
            server.send_message(msg)

    # Clean up temp file
    if os.path.exists(file_path):
        os.remove(file_path)

    return {"data": input_df}

Шаг 3. Проверка функции

Проверьте функцию с помощью примера кадра данных:

test_df = spark.createDataFrame(
    [("Alice", 100), ("Bob", 200)],
    ["name", "amount"]
)

# Test in preview mode (no email sent)
result = run(
    config={
        "is_preview": True,
        "sender_email": "you@gmail.com",
        "secret_scope": "my_email_scope",
        "secret_key": "gmail_app_password",
        "recipients": "alice@example.com",
        "subject": "Test",
        "body": "Test body"
    },
    inputs={"data": test_df},
    spark=spark
)

result["data"].show()
# Expected: the original DataFrame, unchanged

Note

Значения secret_scope и secret_key в конфигурации — это имена области действия секрета и ключа, созданных на шаге 1, а не сам пароль. Оператор использует эти имена для извлечения пароля из секретов Azure Databricks во время выполнения.

Important

Сначала выполните проверку, установив is_preview в значение True, чтобы проверить поведение сквозной передачи, не отправляя никаких электронных писем. Когда вы будете готовы протестировать фактическое сообщение электронной почты, задайте для этого значения is_previewFalse.

Шаг 4. Создание определения YAML

Создайте файл с именем gmail_email_sender.yaml со следующим содержимым:

schema: user-defined-operator-v0.1.0
id: gmail_email_sender
type: python-run-function
version: '1.0.0'
name: Gmail Email Sender
description: Sends the input DataFrame as a CSV attachment via Gmail SMTP to one or more recipients.

config:
  type: object
  properties:
    is_preview:
      type: boolean
      format: is_preview
      default: false
    sender_email:
      type: string
      title: Sender Email
      default: ''
      examples:
        - 'you@gmail.com'
      x-ui:
        widget: input
    secret_scope:
      type: string
      title: Secret Scope
      default: ''
      examples:
        - 'my_email_scope'
      x-ui:
        widget: input
    secret_key:
      type: string
      title: Secret Key
      default: ''
      examples:
        - 'gmail_app_password'
      x-ui:
        widget: input
    recipients:
      type: string
      title: Recipients
      default: ''
      examples:
        - 'alice@example.com, bob@example.com'
      x-ui:
        widget: textarea
        rows: 2
    subject:
      type: string
      title: Subject
      default: ''
      examples:
        - 'Designer Output Data'
      x-ui:
        widget: input
    body:
      type: string
      title: Email Body
      default: "Hello,\n\nAttached is the latest data.\n\nBest,\nDatabricks Workflow"
      x-ui:
        widget: textarea
        rows: 6
  required:
    - sender_email
    - secret_scope
    - secret_key
    - recipients
    - subject
  additionalProperties: false

ports:
  input:
    - name: data
      title: Input Data
      mime: application/vnd.databricks.dataframe
  output:
    - name: data
      title: Output Data
      mime: application/vnd.databricks.dataframe

run_function:
  type: inline
  code: |
    from typing import Dict, Any

    def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
        input_df = inputs["data"]

        if config.get("is_preview", False):
            return {"data": input_df}

        import smtplib
        import os
        from email.mime.multipart import MIMEMultipart
        from email.mime.text import MIMEText
        from email.mime.base import MIMEBase
        from email import encoders

        sender_email = config.get("sender_email", "")
        secret_scope = config.get("secret_scope", "")
        secret_key = config.get("secret_key", "")
        recipients_raw = config.get("recipients", "")
        subject = config.get("subject", "")
        body = config.get("body", "")

        if not sender_email:
            raise ValueError("Sender Email is required.")
        if not secret_scope or not secret_key:
            raise ValueError("Secret Scope and Secret Key are required.")
        if not recipients_raw:
            raise ValueError("At least one recipient is required.")

        recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
        if not recipients:
            raise ValueError("At least one valid recipient email is required.")

        from pyspark.dbutils import DBUtils
        dbutils = DBUtils(spark)
        sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

        pdf = input_df.toPandas()
        file_path = "/tmp/designer_email_attachment.csv"
        pdf.to_csv(file_path, index=False)

        for recipient in recipients:
            msg = MIMEMultipart()
            msg["From"] = sender_email
            msg["To"] = recipient
            msg["Subject"] = subject
            msg.attach(MIMEText(body, "plain"))

            with open(file_path, "rb") as attachment:
                part = MIMEBase("application", "octet-stream")
                part.set_payload(attachment.read())
                encoders.encode_base64(part)
                part.add_header(
                    "Content-Disposition",
                    f"attachment; filename={os.path.basename(file_path)}",
                )
                msg.attach(part)

            with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
                server.login(sender_email, sender_password)
                server.send_message(msg)

        if os.path.exists(file_path):
            os.remove(file_path)

        return {"data": input_df}

Шаг 5. Сохранение и регистрация оператора

  1. Сохраните ФАЙЛ YAML в рабочей области Azure Databricks. Рассмотрим пример.

    /Workspace/Users/<user-name>/gmail_email_sender.yaml
    
  2. Добавьте оператор в свой файл .user_defined_operators.yaml:

    operators:
      - /Workspace/Users/<user-name>/gmail_email_sender.yaml
    

Дополнительные сведения о вариантах регистрации см. в разделе Сделать оператор доступным для поиска.

Permissions

Пользователям, выполняющим рабочий процесс, содержащий этот оператор, требуется READ доступ к области секрета или они могут предоставлять собственные значения области секрета и ключи в конфигурации оператора. Пользователям также нужен доступ на чтение к файлу YAML в рабочей области.

Чтобы предоставить доступ к области секрета, выполните следующие действия.

databricks secrets put-acl my_email_scope <user-or-group> READ

Использование оператора в конструкторе Lakeflow

После регистрации оператор появится в Конструкторе Lakeflow с входным портом для вашего источника данных и полями конфигурации для адреса электронной почты отправителя, области секретов, секретного ключа, получателей, темы и текста сообщения.

При запуске рабочего процесса оператор преобразует входной кадр данных в CSV-файл, присоединяет его к электронной почте и отправляет его каждому получателю. DataFrame передаётся на выходной порт без изменений, поэтому можно добавить далее в цепочку дополнительные операторы. Во время предварительной версии рабочего процесса электронная почта не отправляется.