Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
Esse recurso está em Visualização Pública.
Este tutorial explica como criar um python-run-function operador para o Lakeflow Designer que envia o conteúdo de um DataFrame como um anexo CSV por meio do Gmail. Use este exemplo para aprender a criar operadores baseados em YAML que executam efeitos colaterais, como enviar notificações ou gravar em sistemas externos. Para saber mais, consulte operadores definidos pelo usuário no Lakeflow Designer.
Requirements
- Um espaço de trabalho do Azure Databricks com permissão para criar escopos de segredos.
- Uma conta do Gmail com uma Senha do Google App (necessária quando a MFA (autenticação multifator) está habilitada).
- A CLI do Databricks instalada no computador de desenvolvimento local.
Etapa 1: Configurar segredos
Armazene suas credenciais do Gmail em um escopo de segredos do Azure Databricks para que o operador possa recuperá-las em tempo de execução.
Crie um escopo secreto usando a CLI do Azure Databricks:
databricks secrets create-scope my_email_scopeArmazene sua senha de aplicativo do Gmail no escopo:
databricks secrets put-secret my_email_scope gmail_app_passwordVocê será solicitado a inserir o valor secreto. Cole a senha do aplicativo Gmail e salve-a.
Etapa 2: Gravar a run() função
O python-run-function tipo de operador requer uma run() função com esta assinatura:
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
-
config: valores de configuração fornecidos pelo usuário na interface do usuário do Lakeflow Designer. -
inputs: DataFrames de entrada chaveados pelo nome da porta. -
spark: a sessão ativa do Spark.
A função deve retornar um dicionário de DataFrames de saída chaveado pelo nome da porta de saída.
Defina e teste a função em uma célula do notebook:
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]
# Skip side effects during Designer preview
if config.get("is_preview", False):
return {"data": input_df}
import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")
if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")
recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")
# Retrieve password from Databricks secrets
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)
# Convert DataFrame to CSV
pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)
# Send email to each recipient
for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)
# Clean up temp file
if os.path.exists(file_path):
os.remove(file_path)
return {"data": input_df}
Etapa 3: Testar a função
Teste a função com um DataFrame de exemplo:
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)
# Test in preview mode (no email sent)
result = run(
config={
"is_preview": True,
"sender_email": "you@gmail.com",
"secret_scope": "my_email_scope",
"secret_key": "gmail_app_password",
"recipients": "alice@example.com",
"subject": "Test",
"body": "Test body"
},
inputs={"data": test_df},
spark=spark
)
result["data"].show()
# Expected: the original DataFrame, unchanged
Note
Os valores de secret_scope e secret_key na configuração são os nomes do escopo de segredo e da chave que você criou na etapa 1 — e não a senha em si. O operador usa esses nomes para recuperar a senha dos segredos do Azure Databricks em tempo de execução.
Importante
Teste com is_preview definido como True primeiro para verificar o comportamento de passagem sem enviar nenhum email. Quando estiver pronto para testar o email real, defina is_preview como False.
Etapa 4: Criar a definição de YAML
Crie um arquivo chamado gmail_email_sender.yaml com o seguinte conteúdo:
schema: user-defined-operator-v0.1.0
id: gmail_email_sender
type: python-run-function
version: '1.0.0'
name: Gmail Email Sender
description: Sends the input DataFrame as a CSV attachment via Gmail SMTP to one or more recipients.
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
sender_email:
type: string
title: Sender Email
default: ''
examples:
- 'you@gmail.com'
x-ui:
widget: input
secret_scope:
type: string
title: Secret Scope
default: ''
examples:
- 'my_email_scope'
x-ui:
widget: input
secret_key:
type: string
title: Secret Key
default: ''
examples:
- 'gmail_app_password'
x-ui:
widget: input
recipients:
type: string
title: Recipients
default: ''
examples:
- 'alice@example.com, bob@example.com'
x-ui:
widget: textarea
rows: 2
subject:
type: string
title: Subject
default: ''
examples:
- 'Designer Output Data'
x-ui:
widget: input
body:
type: string
title: Email Body
default: "Hello,\n\nAttached is the latest data.\n\nBest,\nDatabricks Workflow"
x-ui:
widget: textarea
rows: 6
required:
- sender_email
- secret_scope
- secret_key
- recipients
- subject
additionalProperties: false
ports:
input:
- name: data
title: Input Data
mime: application/vnd.databricks.dataframe
output:
- name: data
title: Output Data
mime: application/vnd.databricks.dataframe
run_function:
type: inline
code: |
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]
if config.get("is_preview", False):
return {"data": input_df}
import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")
if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")
recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)
pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)
for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)
if os.path.exists(file_path):
os.remove(file_path)
return {"data": input_df}
Etapa 5: Salvar e registrar o operador
Salve o arquivo YAML no workspace Azure Databricks. Por exemplo:
/Workspace/Users/<user-name>/gmail_email_sender.yamlAdicione o operador ao seu
.user_defined_operators.yamlarquivo:operators: - /Workspace/Users/<user-name>/gmail_email_sender.yaml
Para obter mais informações sobre as opções de registro, consulte Tornar seu operador detectável.
Permissions
Os usuários que executam um workflow que contém esse operador precisam de acesso READ ao escopo de segredo, ou podem fornecer seus próprios valores de escopo de segredo e chave na configuração do operador. Os usuários também precisam de acesso de leitura ao arquivo YAML no workspace.
Para conceder acesso ao escopo do segredo:
databricks secrets put-acl my_email_scope <user-or-group> READ
Usando o operador no Lakeflow Designer
Após o registro, o operador aparece no Lakeflow Designer com uma porta de entrada para sua fonte de dados e campos de configuração para e-mail do remetente, escopo do segredo, chave do segredo, destinatários, assunto e corpo.
Quando o fluxo de trabalho é executado, o operador converte o DataFrame de entrada em CSV, anexa-o a um email e envia-o a cada destinatário. O DataFrame passa inalterado para a porta de saída, para que você possa encadear operadores adicionais downstream. Durante a visualização do fluxo de trabalho, nenhum email é enviado.