Tutorial: Operador de remetente de email do Gmail

Importante

Esse recurso está em Visualização Pública.

Este tutorial explica como criar um python-run-function operador para o Lakeflow Designer que envia o conteúdo de um DataFrame como um anexo CSV por meio do Gmail. Use este exemplo para aprender a criar operadores baseados em YAML que executam efeitos colaterais, como enviar notificações ou gravar em sistemas externos. Para saber mais, consulte operadores definidos pelo usuário no Lakeflow Designer.

Requirements

  • Um espaço de trabalho do Azure Databricks com permissão para criar escopos de segredos.
  • Uma conta do Gmail com uma Senha do Google App (necessária quando a MFA (autenticação multifator) está habilitada).
  • A CLI do Databricks instalada no computador de desenvolvimento local.

Etapa 1: Configurar segredos

Armazene suas credenciais do Gmail em um escopo de segredos do Azure Databricks para que o operador possa recuperá-las em tempo de execução.

  1. Crie um escopo secreto usando a CLI do Azure Databricks:

    databricks secrets create-scope my_email_scope
    
  2. Armazene sua senha de aplicativo do Gmail no escopo:

    databricks secrets put-secret my_email_scope gmail_app_password
    

    Você será solicitado a inserir o valor secreto. Cole a senha do aplicativo Gmail e salve-a.

Etapa 2: Gravar a run() função

O python-run-function tipo de operador requer uma run() função com esta assinatura:

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config: valores de configuração fornecidos pelo usuário na interface do usuário do Lakeflow Designer.
  • inputs: DataFrames de entrada chaveados pelo nome da porta.
  • spark: a sessão ativa do Spark.

A função deve retornar um dicionário de DataFrames de saída chaveado pelo nome da porta de saída.

Defina e teste a função em uma célula do notebook:

from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
    input_df = inputs["data"]

    # Skip side effects during Designer preview
    if config.get("is_preview", False):
        return {"data": input_df}

    import smtplib
    import os
    from email.mime.multipart import MIMEMultipart
    from email.mime.text import MIMEText
    from email.mime.base import MIMEBase
    from email import encoders

    sender_email = config.get("sender_email", "")
    secret_scope = config.get("secret_scope", "")
    secret_key = config.get("secret_key", "")
    recipients_raw = config.get("recipients", "")
    subject = config.get("subject", "")
    body = config.get("body", "")

    if not sender_email:
        raise ValueError("Sender Email is required.")
    if not secret_scope or not secret_key:
        raise ValueError("Secret Scope and Secret Key are required.")
    if not recipients_raw:
        raise ValueError("At least one recipient is required.")

    recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
    if not recipients:
        raise ValueError("At least one valid recipient email is required.")

    # Retrieve password from Databricks secrets
    from pyspark.dbutils import DBUtils
    dbutils = DBUtils(spark)
    sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

    # Convert DataFrame to CSV
    pdf = input_df.toPandas()
    file_path = "/tmp/designer_email_attachment.csv"
    pdf.to_csv(file_path, index=False)

    # Send email to each recipient
    for recipient in recipients:
        msg = MIMEMultipart()
        msg["From"] = sender_email
        msg["To"] = recipient
        msg["Subject"] = subject
        msg.attach(MIMEText(body, "plain"))

        with open(file_path, "rb") as attachment:
            part = MIMEBase("application", "octet-stream")
            part.set_payload(attachment.read())
            encoders.encode_base64(part)
            part.add_header(
                "Content-Disposition",
                f"attachment; filename={os.path.basename(file_path)}",
            )
            msg.attach(part)

        with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
            server.login(sender_email, sender_password)
            server.send_message(msg)

    # Clean up temp file
    if os.path.exists(file_path):
        os.remove(file_path)

    return {"data": input_df}

Etapa 3: Testar a função

Teste a função com um DataFrame de exemplo:

test_df = spark.createDataFrame(
    [("Alice", 100), ("Bob", 200)],
    ["name", "amount"]
)

# Test in preview mode (no email sent)
result = run(
    config={
        "is_preview": True,
        "sender_email": "you@gmail.com",
        "secret_scope": "my_email_scope",
        "secret_key": "gmail_app_password",
        "recipients": "alice@example.com",
        "subject": "Test",
        "body": "Test body"
    },
    inputs={"data": test_df},
    spark=spark
)

result["data"].show()
# Expected: the original DataFrame, unchanged

Note

Os valores de secret_scope e secret_key na configuração são os nomes do escopo de segredo e da chave que você criou na etapa 1 — e não a senha em si. O operador usa esses nomes para recuperar a senha dos segredos do Azure Databricks em tempo de execução.

Importante

Teste com is_preview definido como True primeiro para verificar o comportamento de passagem sem enviar nenhum email. Quando estiver pronto para testar o email real, defina is_preview como False.

Etapa 4: Criar a definição de YAML

Crie um arquivo chamado gmail_email_sender.yaml com o seguinte conteúdo:

schema: user-defined-operator-v0.1.0
id: gmail_email_sender
type: python-run-function
version: '1.0.0'
name: Gmail Email Sender
description: Sends the input DataFrame as a CSV attachment via Gmail SMTP to one or more recipients.

config:
  type: object
  properties:
    is_preview:
      type: boolean
      format: is_preview
      default: false
    sender_email:
      type: string
      title: Sender Email
      default: ''
      examples:
        - 'you@gmail.com'
      x-ui:
        widget: input
    secret_scope:
      type: string
      title: Secret Scope
      default: ''
      examples:
        - 'my_email_scope'
      x-ui:
        widget: input
    secret_key:
      type: string
      title: Secret Key
      default: ''
      examples:
        - 'gmail_app_password'
      x-ui:
        widget: input
    recipients:
      type: string
      title: Recipients
      default: ''
      examples:
        - 'alice@example.com, bob@example.com'
      x-ui:
        widget: textarea
        rows: 2
    subject:
      type: string
      title: Subject
      default: ''
      examples:
        - 'Designer Output Data'
      x-ui:
        widget: input
    body:
      type: string
      title: Email Body
      default: "Hello,\n\nAttached is the latest data.\n\nBest,\nDatabricks Workflow"
      x-ui:
        widget: textarea
        rows: 6
  required:
    - sender_email
    - secret_scope
    - secret_key
    - recipients
    - subject
  additionalProperties: false

ports:
  input:
    - name: data
      title: Input Data
      mime: application/vnd.databricks.dataframe
  output:
    - name: data
      title: Output Data
      mime: application/vnd.databricks.dataframe

run_function:
  type: inline
  code: |
    from typing import Dict, Any

    def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
        input_df = inputs["data"]

        if config.get("is_preview", False):
            return {"data": input_df}

        import smtplib
        import os
        from email.mime.multipart import MIMEMultipart
        from email.mime.text import MIMEText
        from email.mime.base import MIMEBase
        from email import encoders

        sender_email = config.get("sender_email", "")
        secret_scope = config.get("secret_scope", "")
        secret_key = config.get("secret_key", "")
        recipients_raw = config.get("recipients", "")
        subject = config.get("subject", "")
        body = config.get("body", "")

        if not sender_email:
            raise ValueError("Sender Email is required.")
        if not secret_scope or not secret_key:
            raise ValueError("Secret Scope and Secret Key are required.")
        if not recipients_raw:
            raise ValueError("At least one recipient is required.")

        recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
        if not recipients:
            raise ValueError("At least one valid recipient email is required.")

        from pyspark.dbutils import DBUtils
        dbutils = DBUtils(spark)
        sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

        pdf = input_df.toPandas()
        file_path = "/tmp/designer_email_attachment.csv"
        pdf.to_csv(file_path, index=False)

        for recipient in recipients:
            msg = MIMEMultipart()
            msg["From"] = sender_email
            msg["To"] = recipient
            msg["Subject"] = subject
            msg.attach(MIMEText(body, "plain"))

            with open(file_path, "rb") as attachment:
                part = MIMEBase("application", "octet-stream")
                part.set_payload(attachment.read())
                encoders.encode_base64(part)
                part.add_header(
                    "Content-Disposition",
                    f"attachment; filename={os.path.basename(file_path)}",
                )
                msg.attach(part)

            with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
                server.login(sender_email, sender_password)
                server.send_message(msg)

        if os.path.exists(file_path):
            os.remove(file_path)

        return {"data": input_df}

Etapa 5: Salvar e registrar o operador

  1. Salve o arquivo YAML no workspace Azure Databricks. Por exemplo:

    /Workspace/Users/<user-name>/gmail_email_sender.yaml
    
  2. Adicione o operador ao seu .user_defined_operators.yaml arquivo:

    operators:
      - /Workspace/Users/<user-name>/gmail_email_sender.yaml
    

Para obter mais informações sobre as opções de registro, consulte Tornar seu operador detectável.

Permissions

Os usuários que executam um workflow que contém esse operador precisam de acesso READ ao escopo de segredo, ou podem fornecer seus próprios valores de escopo de segredo e chave na configuração do operador. Os usuários também precisam de acesso de leitura ao arquivo YAML no workspace.

Para conceder acesso ao escopo do segredo:

databricks secrets put-acl my_email_scope <user-or-group> READ

Usando o operador no Lakeflow Designer

Após o registro, o operador aparece no Lakeflow Designer com uma porta de entrada para sua fonte de dados e campos de configuração para e-mail do remetente, escopo do segredo, chave do segredo, destinatários, assunto e corpo.

Quando o fluxo de trabalho é executado, o operador converte o DataFrame de entrada em CSV, anexa-o a um email e envia-o a cada destinatário. O DataFrame passa inalterado para a porta de saída, para que você possa encadear operadores adicionais downstream. Durante a visualização do fluxo de trabalho, nenhum email é enviado.