Lernprogramm: Gmail-E-Mail-Absenderoperator

Important

Dieses Feature befindet sich in der Public Preview.

Dieses Lernprogramm führt Sie durch das Erstellen eines python-run-function Operators für Lakeflow Designer, der den Inhalt eines DataFrame als CSV-Anlage über Gmail sendet. In diesem Beispiel erfahren Sie, wie Sie YAML-basierte Operatoren erstellen, die Nebenwirkungen ausführen, z. B. das Senden von Benachrichtigungen oder das Schreiben an externe Systeme. Weitere Informationen finden Sie unter Benutzerdefinierte Operatoren in Lakeflow Designer.

Requirements

  • Ein Azure Databricks Arbeitsbereich mit Zugriff zum Erstellen geheimer Bereiche.
  • Ein Gmail-Konto mit einem Google App-Kennwort (erforderlich, wenn die mehrstufige Authentifizierung (MFA) aktiviert ist).
  • Die Databricks CLI wurde auf Ihrem lokalen Entwicklungscomputer installiert.

Schritt 1: Geheimnisse einrichten

Speichern Sie Ihre Gmail-Anmeldeinformationen in einem Azure Databricks geheimen Bereich, damit der Operator sie zur Laufzeit abrufen kann.

  1. Erstellen Sie einen geheimen Bereich mithilfe der Azure Databricks CLI:

    databricks secrets create-scope my_email_scope
    
  2. Speichern Sie Ihr Gmail-App-Kennwort im Bereich:

    databricks secrets put-secret my_email_scope gmail_app_password
    

    Sie werden aufgefordert, den geheimen Wert einzugeben. Fügen Sie Ihr Gmail-App-Kennwort ein, und speichern Sie es.

Schritt 2: Schreiben der run() Funktion

Der python-run-function Operatortyp erfordert eine run() Funktion mit dieser Signatur:

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config: Konfigurationswerte, die vom Benutzer in der Lakeflow Designer-Benutzeroberfläche bereitgestellt werden.
  • inputs: Eingabe-DataFrames, indiziert nach Portnamen.
  • spark: Die aktive Spark-Sitzung.

Die Funktion muss ein Wörterbuch von Ausgabe-DataFrames zurückgeben, dessen Schlüssel die Namen der Ausgabeports sind.

Definieren und Testen der Funktion in einer Notizbuchzelle:

from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
    input_df = inputs["data"]

    # Skip side effects during Designer preview
    if config.get("is_preview", False):
        return {"data": input_df}

    import smtplib
    import os
    from email.mime.multipart import MIMEMultipart
    from email.mime.text import MIMEText
    from email.mime.base import MIMEBase
    from email import encoders

    sender_email = config.get("sender_email", "")
    secret_scope = config.get("secret_scope", "")
    secret_key = config.get("secret_key", "")
    recipients_raw = config.get("recipients", "")
    subject = config.get("subject", "")
    body = config.get("body", "")

    if not sender_email:
        raise ValueError("Sender Email is required.")
    if not secret_scope or not secret_key:
        raise ValueError("Secret Scope and Secret Key are required.")
    if not recipients_raw:
        raise ValueError("At least one recipient is required.")

    recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
    if not recipients:
        raise ValueError("At least one valid recipient email is required.")

    # Retrieve password from Databricks secrets
    from pyspark.dbutils import DBUtils
    dbutils = DBUtils(spark)
    sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

    # Convert DataFrame to CSV
    pdf = input_df.toPandas()
    file_path = "/tmp/designer_email_attachment.csv"
    pdf.to_csv(file_path, index=False)

    # Send email to each recipient
    for recipient in recipients:
        msg = MIMEMultipart()
        msg["From"] = sender_email
        msg["To"] = recipient
        msg["Subject"] = subject
        msg.attach(MIMEText(body, "plain"))

        with open(file_path, "rb") as attachment:
            part = MIMEBase("application", "octet-stream")
            part.set_payload(attachment.read())
            encoders.encode_base64(part)
            part.add_header(
                "Content-Disposition",
                f"attachment; filename={os.path.basename(file_path)}",
            )
            msg.attach(part)

        with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
            server.login(sender_email, sender_password)
            server.send_message(msg)

    # Clean up temp file
    if os.path.exists(file_path):
        os.remove(file_path)

    return {"data": input_df}

Schritt 3: Testen der Funktion

Testen Sie die Funktion mit einem Beispiel-DataFrame:

test_df = spark.createDataFrame(
    [("Alice", 100), ("Bob", 200)],
    ["name", "amount"]
)

# Test in preview mode (no email sent)
result = run(
    config={
        "is_preview": True,
        "sender_email": "you@gmail.com",
        "secret_scope": "my_email_scope",
        "secret_key": "gmail_app_password",
        "recipients": "alice@example.com",
        "subject": "Test",
        "body": "Test body"
    },
    inputs={"data": test_df},
    spark=spark
)

result["data"].show()
# Expected: the original DataFrame, unchanged

Note

Die secret_scope Werte und secret_key Werte in der Konfiguration sind die Namen des geheimen Bereichs und schlüssels, den Sie in Schritt 1 erstellt haben – nicht das tatsächliche Kennwort. Der Operator verwendet diese Namen, um das Kennwort zur Laufzeit aus den Azure Databricks-Secrets abzurufen.

Important

Testen Sie zunächst mit is_preview, das auf True gesetzt ist, um das Pass-Through-Verhalten zu überprüfen, ohne E-Mails zu versenden. Wenn Sie bereit sind, die eigentliche E-Mail zu testen, setzen Sie is_preview auf False.

Schritt 4: Erstellen der YAML-Definition

Erstellen Sie eine Datei mit dem Namen gmail_email_sender.yaml und dem folgenden Inhalt:

schema: user-defined-operator-v0.1.0
id: gmail_email_sender
type: python-run-function
version: '1.0.0'
name: Gmail Email Sender
description: Sends the input DataFrame as a CSV attachment via Gmail SMTP to one or more recipients.

config:
  type: object
  properties:
    is_preview:
      type: boolean
      format: is_preview
      default: false
    sender_email:
      type: string
      title: Sender Email
      default: ''
      examples:
        - 'you@gmail.com'
      x-ui:
        widget: input
    secret_scope:
      type: string
      title: Secret Scope
      default: ''
      examples:
        - 'my_email_scope'
      x-ui:
        widget: input
    secret_key:
      type: string
      title: Secret Key
      default: ''
      examples:
        - 'gmail_app_password'
      x-ui:
        widget: input
    recipients:
      type: string
      title: Recipients
      default: ''
      examples:
        - 'alice@example.com, bob@example.com'
      x-ui:
        widget: textarea
        rows: 2
    subject:
      type: string
      title: Subject
      default: ''
      examples:
        - 'Designer Output Data'
      x-ui:
        widget: input
    body:
      type: string
      title: Email Body
      default: "Hello,\n\nAttached is the latest data.\n\nBest,\nDatabricks Workflow"
      x-ui:
        widget: textarea
        rows: 6
  required:
    - sender_email
    - secret_scope
    - secret_key
    - recipients
    - subject
  additionalProperties: false

ports:
  input:
    - name: data
      title: Input Data
      mime: application/vnd.databricks.dataframe
  output:
    - name: data
      title: Output Data
      mime: application/vnd.databricks.dataframe

run_function:
  type: inline
  code: |
    from typing import Dict, Any

    def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
        input_df = inputs["data"]

        if config.get("is_preview", False):
            return {"data": input_df}

        import smtplib
        import os
        from email.mime.multipart import MIMEMultipart
        from email.mime.text import MIMEText
        from email.mime.base import MIMEBase
        from email import encoders

        sender_email = config.get("sender_email", "")
        secret_scope = config.get("secret_scope", "")
        secret_key = config.get("secret_key", "")
        recipients_raw = config.get("recipients", "")
        subject = config.get("subject", "")
        body = config.get("body", "")

        if not sender_email:
            raise ValueError("Sender Email is required.")
        if not secret_scope or not secret_key:
            raise ValueError("Secret Scope and Secret Key are required.")
        if not recipients_raw:
            raise ValueError("At least one recipient is required.")

        recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
        if not recipients:
            raise ValueError("At least one valid recipient email is required.")

        from pyspark.dbutils import DBUtils
        dbutils = DBUtils(spark)
        sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

        pdf = input_df.toPandas()
        file_path = "/tmp/designer_email_attachment.csv"
        pdf.to_csv(file_path, index=False)

        for recipient in recipients:
            msg = MIMEMultipart()
            msg["From"] = sender_email
            msg["To"] = recipient
            msg["Subject"] = subject
            msg.attach(MIMEText(body, "plain"))

            with open(file_path, "rb") as attachment:
                part = MIMEBase("application", "octet-stream")
                part.set_payload(attachment.read())
                encoders.encode_base64(part)
                part.add_header(
                    "Content-Disposition",
                    f"attachment; filename={os.path.basename(file_path)}",
                )
                msg.attach(part)

            with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
                server.login(sender_email, sender_password)
                server.send_message(msg)

        if os.path.exists(file_path):
            os.remove(file_path)

        return {"data": input_df}

Schritt 5: Speichern und Registrieren des Operators

  1. Speichern Sie die YAML-Datei in Ihrem Azure Databricks Arbeitsbereich. Beispiel:

    /Workspace/Users/<user-name>/gmail_email_sender.yaml
    
  2. Fügen Sie den Operator Ihrer .user_defined_operators.yaml-Datei hinzu:

    operators:
      - /Workspace/Users/<user-name>/gmail_email_sender.yaml
    

Weitere Informationen zu Registrierungsoptionen finden Sie unter Make your operator discoverable.

Erlaubnisse

Benutzer, die einen Workflow ausführen, der diesen Operator enthält, benötigen READ Zugriff auf den geheimen Bereich, oder sie können ihren eigenen geheimen Bereich und Schlüsselwerte in der Operatorkonfiguration bereitstellen. Benutzer benötigen auch Lesezugriff auf die YAML-Datei im Arbeitsbereich.

So gewähren Sie Zugriff auf den geheimen Geltungsbereich:

databricks secrets put-acl my_email_scope <user-or-group> READ

Den Operator in Lakeflow Designer verwenden

Nach der Registrierung wird der Operator im Lakeflow Designer mit einem Eingabeport für Ihre Datenquellen- und Konfigurationsfelder für Absender-E-Mails, geheimer Bereich, geheimer Schlüssel, Empfänger, Betreff und Text angezeigt.

Wenn der Workflow ausgeführt wird, konvertiert der Operator den Eingabedatenframe in CSV, fügt ihn an eine E-Mail an und sendet ihn an jeden Empfänger. Der DataFrame wird unverändert an den Ausgabeport weitergeleitet, sodass Sie nachgelagert weitere Operatoren anschließen können. Während der Workflowvorschau wird keine E-Mail gesendet.