Tutoriel : Opérateur d’expéditeur de courrier Gmail

Important

Cette fonctionnalité est disponible en préversion publique.

Ce tutoriel décrit la création d’un python-run-function opérateur pour Lakeflow Designer qui envoie le contenu d’un DataFrame en tant que pièce jointe CSV via Gmail. Utilisez cet exemple pour apprendre à créer des opérateurs YAML qui effectuent des effets secondaires, tels que l’envoi de notifications ou l’écriture dans des systèmes externes. Pour en savoir plus, consultez les opérateurs définis par l’utilisateur dans Lakeflow Designer.

Spécifications

  • Un espace de travail Azure Databricks avec accès pour créer des étendues secrètes.
  • Un compte Gmail avec un mot de passe d’application Google (obligatoire lorsque l’authentification multifacteur (MFA) est activée).
  • L’interface CLI Databricks installée sur votre ordinateur de développement local.

Étape 1 : Configurer les secrets

Stockez vos informations d’identification Gmail dans une étendue secrète Azure Databricks afin que l’opérateur puisse les récupérer au moment de l’exécution.

  1. Créez une portée de secret à l’aide de l’interface de ligne de commande Azure Databricks :

    databricks secrets create-scope my_email_scope
    
  2. Enregistrez votre mot de passe d’application Gmail dans la portée :

    databricks secrets put-secret my_email_scope gmail_app_password
    

    Vous êtes invité à entrer la valeur secrète. Collez votre mot de passe d’application Gmail et enregistrez-le.

Étape 2 : Écrire la run() fonction

Le python-run-function type d’opérateur nécessite une run() fonction avec cette signature :

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config: valeurs de configuration fournies par l’utilisateur dans l’interface utilisateur du Concepteur Lakeflow.
  • inputs: DataFrames d’entrée indexées par le nom du port.
  • spark : La session Spark active.

La fonction doit renvoyer un dictionnaire de DataFrames de sortie dont les clés sont les noms des ports de sortie.

Définissez et testez la fonction dans une cellule de notebook :

from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
    input_df = inputs["data"]

    # Skip side effects during Designer preview
    if config.get("is_preview", False):
        return {"data": input_df}

    import smtplib
    import os
    from email.mime.multipart import MIMEMultipart
    from email.mime.text import MIMEText
    from email.mime.base import MIMEBase
    from email import encoders

    sender_email = config.get("sender_email", "")
    secret_scope = config.get("secret_scope", "")
    secret_key = config.get("secret_key", "")
    recipients_raw = config.get("recipients", "")
    subject = config.get("subject", "")
    body = config.get("body", "")

    if not sender_email:
        raise ValueError("Sender Email is required.")
    if not secret_scope or not secret_key:
        raise ValueError("Secret Scope and Secret Key are required.")
    if not recipients_raw:
        raise ValueError("At least one recipient is required.")

    recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
    if not recipients:
        raise ValueError("At least one valid recipient email is required.")

    # Retrieve password from Databricks secrets
    from pyspark.dbutils import DBUtils
    dbutils = DBUtils(spark)
    sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

    # Convert DataFrame to CSV
    pdf = input_df.toPandas()
    file_path = "/tmp/designer_email_attachment.csv"
    pdf.to_csv(file_path, index=False)

    # Send email to each recipient
    for recipient in recipients:
        msg = MIMEMultipart()
        msg["From"] = sender_email
        msg["To"] = recipient
        msg["Subject"] = subject
        msg.attach(MIMEText(body, "plain"))

        with open(file_path, "rb") as attachment:
            part = MIMEBase("application", "octet-stream")
            part.set_payload(attachment.read())
            encoders.encode_base64(part)
            part.add_header(
                "Content-Disposition",
                f"attachment; filename={os.path.basename(file_path)}",
            )
            msg.attach(part)

        with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
            server.login(sender_email, sender_password)
            server.send_message(msg)

    # Clean up temp file
    if os.path.exists(file_path):
        os.remove(file_path)

    return {"data": input_df}

Étape 3 : Tester la fonction

Testez la fonction avec un exemple de DataFrame :

test_df = spark.createDataFrame(
    [("Alice", 100), ("Bob", 200)],
    ["name", "amount"]
)

# Test in preview mode (no email sent)
result = run(
    config={
        "is_preview": True,
        "sender_email": "you@gmail.com",
        "secret_scope": "my_email_scope",
        "secret_key": "gmail_app_password",
        "recipients": "alice@example.com",
        "subject": "Test",
        "body": "Test body"
    },
    inputs={"data": test_df},
    spark=spark
)

result["data"].show()
# Expected: the original DataFrame, unchanged

Note

Les valeurs secret_scope et secret_key de la configuration sont les noms du périmètre de secret et de la clé que vous avez créés à l’étape 1 -- et non le mot de passe lui-même. L’opérateur utilise ces noms pour récupérer le mot de passe de Azure Databricks secrets au moment de l’exécution.

Important

Testez d’abord avec is_preview défini sur True afin de vérifier le comportement de transmission sans envoyer d’e-mail. Lorsque vous êtes prêt à tester l’e-mail réel, définissez is_preview sur False.

Étape 4 : Générer la définition YAML

Créez un fichier appelé gmail_email_sender.yaml avec le contenu suivant :

schema: user-defined-operator-v0.1.0
id: gmail_email_sender
type: python-run-function
version: '1.0.0'
name: Gmail Email Sender
description: Sends the input DataFrame as a CSV attachment via Gmail SMTP to one or more recipients.

config:
  type: object
  properties:
    is_preview:
      type: boolean
      format: is_preview
      default: false
    sender_email:
      type: string
      title: Sender Email
      default: ''
      examples:
        - 'you@gmail.com'
      x-ui:
        widget: input
    secret_scope:
      type: string
      title: Secret Scope
      default: ''
      examples:
        - 'my_email_scope'
      x-ui:
        widget: input
    secret_key:
      type: string
      title: Secret Key
      default: ''
      examples:
        - 'gmail_app_password'
      x-ui:
        widget: input
    recipients:
      type: string
      title: Recipients
      default: ''
      examples:
        - 'alice@example.com, bob@example.com'
      x-ui:
        widget: textarea
        rows: 2
    subject:
      type: string
      title: Subject
      default: ''
      examples:
        - 'Designer Output Data'
      x-ui:
        widget: input
    body:
      type: string
      title: Email Body
      default: "Hello,\n\nAttached is the latest data.\n\nBest,\nDatabricks Workflow"
      x-ui:
        widget: textarea
        rows: 6
  required:
    - sender_email
    - secret_scope
    - secret_key
    - recipients
    - subject
  additionalProperties: false

ports:
  input:
    - name: data
      title: Input Data
      mime: application/vnd.databricks.dataframe
  output:
    - name: data
      title: Output Data
      mime: application/vnd.databricks.dataframe

run_function:
  type: inline
  code: |
    from typing import Dict, Any

    def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
        input_df = inputs["data"]

        if config.get("is_preview", False):
            return {"data": input_df}

        import smtplib
        import os
        from email.mime.multipart import MIMEMultipart
        from email.mime.text import MIMEText
        from email.mime.base import MIMEBase
        from email import encoders

        sender_email = config.get("sender_email", "")
        secret_scope = config.get("secret_scope", "")
        secret_key = config.get("secret_key", "")
        recipients_raw = config.get("recipients", "")
        subject = config.get("subject", "")
        body = config.get("body", "")

        if not sender_email:
            raise ValueError("Sender Email is required.")
        if not secret_scope or not secret_key:
            raise ValueError("Secret Scope and Secret Key are required.")
        if not recipients_raw:
            raise ValueError("At least one recipient is required.")

        recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
        if not recipients:
            raise ValueError("At least one valid recipient email is required.")

        from pyspark.dbutils import DBUtils
        dbutils = DBUtils(spark)
        sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

        pdf = input_df.toPandas()
        file_path = "/tmp/designer_email_attachment.csv"
        pdf.to_csv(file_path, index=False)

        for recipient in recipients:
            msg = MIMEMultipart()
            msg["From"] = sender_email
            msg["To"] = recipient
            msg["Subject"] = subject
            msg.attach(MIMEText(body, "plain"))

            with open(file_path, "rb") as attachment:
                part = MIMEBase("application", "octet-stream")
                part.set_payload(attachment.read())
                encoders.encode_base64(part)
                part.add_header(
                    "Content-Disposition",
                    f"attachment; filename={os.path.basename(file_path)}",
                )
                msg.attach(part)

            with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
                server.login(sender_email, sender_password)
                server.send_message(msg)

        if os.path.exists(file_path):
            os.remove(file_path)

        return {"data": input_df}

Étape 5 : Enregistrer et inscrire l’opérateur

  1. Enregistrez le fichier YAML dans votre espace de travail Azure Databricks. Par exemple:

    /Workspace/Users/<user-name>/gmail_email_sender.yaml
    
  2. Ajoutez l’opérateur au fichier .user_defined_operators.yaml :

    operators:
      - /Workspace/Users/<user-name>/gmail_email_sender.yaml
    

Pour plus d’informations sur les options d’inscription, consultez Rendre votre opérateur détectable.

Permissions

Les utilisateurs qui exécutent un flux de travail contenant cet opérateur ont besoin READ d’accéder à l’étendue secrète, ou ils peuvent fournir leur propre étendue secrète et leurs propres valeurs de clé dans la configuration de l’opérateur. Les utilisateurs ont également besoin d’un accès en lecture au fichier YAML dans l’espace de travail.

Pour accorder l’accès au périmètre secret :

databricks secrets put-acl my_email_scope <user-or-group> READ

Utilisation de l’opérateur dans Lakeflow Designer

Après l’inscription, l’opérateur apparaît dans Lakeflow Designer avec un port d’entrée pour vos champs de source de données et de configuration pour l’e-mail de l’expéditeur, l’étendue secrète, la clé secrète, les destinataires, l’objet et le corps.

Lorsque le flux de travail s’exécute, l’opérateur convertit le DataFrame d’entrée au format CSV, l’attache à un e-mail et l’envoie à chaque destinataire. Le DataFrame est transmis sans modification vers le port de sortie, ce qui vous permet de chaîner des opérateurs supplémentaires en aval. Pendant la préversion du flux de travail, aucun e-mail n’est envoyé.