Remarque
L’accès à cette page requiert une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page requiert une autorisation. Vous pouvez essayer de modifier des répertoires.
Important
Cette fonctionnalité est disponible en préversion publique.
Ce tutoriel décrit la création d’un python-run-function opérateur pour Lakeflow Designer qui envoie le contenu d’un DataFrame en tant que pièce jointe CSV via Gmail. Utilisez cet exemple pour apprendre à créer des opérateurs YAML qui effectuent des effets secondaires, tels que l’envoi de notifications ou l’écriture dans des systèmes externes. Pour en savoir plus, consultez les opérateurs définis par l’utilisateur dans Lakeflow Designer.
Spécifications
- Un espace de travail Azure Databricks avec accès pour créer des étendues secrètes.
- Un compte Gmail avec un mot de passe d’application Google (obligatoire lorsque l’authentification multifacteur (MFA) est activée).
- L’interface CLI Databricks installée sur votre ordinateur de développement local.
Étape 1 : Configurer les secrets
Stockez vos informations d’identification Gmail dans une étendue secrète Azure Databricks afin que l’opérateur puisse les récupérer au moment de l’exécution.
Créez une portée de secret à l’aide de l’interface de ligne de commande Azure Databricks :
databricks secrets create-scope my_email_scopeEnregistrez votre mot de passe d’application Gmail dans la portée :
databricks secrets put-secret my_email_scope gmail_app_passwordVous êtes invité à entrer la valeur secrète. Collez votre mot de passe d’application Gmail et enregistrez-le.
Étape 2 : Écrire la run() fonction
Le python-run-function type d’opérateur nécessite une run() fonction avec cette signature :
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
-
config: valeurs de configuration fournies par l’utilisateur dans l’interface utilisateur du Concepteur Lakeflow. -
inputs: DataFrames d’entrée indexées par le nom du port. -
spark: La session Spark active.
La fonction doit renvoyer un dictionnaire de DataFrames de sortie dont les clés sont les noms des ports de sortie.
Définissez et testez la fonction dans une cellule de notebook :
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]
# Skip side effects during Designer preview
if config.get("is_preview", False):
return {"data": input_df}
import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")
if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")
recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")
# Retrieve password from Databricks secrets
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)
# Convert DataFrame to CSV
pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)
# Send email to each recipient
for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)
# Clean up temp file
if os.path.exists(file_path):
os.remove(file_path)
return {"data": input_df}
Étape 3 : Tester la fonction
Testez la fonction avec un exemple de DataFrame :
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)
# Test in preview mode (no email sent)
result = run(
config={
"is_preview": True,
"sender_email": "you@gmail.com",
"secret_scope": "my_email_scope",
"secret_key": "gmail_app_password",
"recipients": "alice@example.com",
"subject": "Test",
"body": "Test body"
},
inputs={"data": test_df},
spark=spark
)
result["data"].show()
# Expected: the original DataFrame, unchanged
Note
Les valeurs secret_scope et secret_key de la configuration sont les noms du périmètre de secret et de la clé que vous avez créés à l’étape 1 -- et non le mot de passe lui-même. L’opérateur utilise ces noms pour récupérer le mot de passe de Azure Databricks secrets au moment de l’exécution.
Important
Testez d’abord avec is_preview défini sur True afin de vérifier le comportement de transmission sans envoyer d’e-mail. Lorsque vous êtes prêt à tester l’e-mail réel, définissez is_preview sur False.
Étape 4 : Générer la définition YAML
Créez un fichier appelé gmail_email_sender.yaml avec le contenu suivant :
schema: user-defined-operator-v0.1.0
id: gmail_email_sender
type: python-run-function
version: '1.0.0'
name: Gmail Email Sender
description: Sends the input DataFrame as a CSV attachment via Gmail SMTP to one or more recipients.
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
sender_email:
type: string
title: Sender Email
default: ''
examples:
- 'you@gmail.com'
x-ui:
widget: input
secret_scope:
type: string
title: Secret Scope
default: ''
examples:
- 'my_email_scope'
x-ui:
widget: input
secret_key:
type: string
title: Secret Key
default: ''
examples:
- 'gmail_app_password'
x-ui:
widget: input
recipients:
type: string
title: Recipients
default: ''
examples:
- 'alice@example.com, bob@example.com'
x-ui:
widget: textarea
rows: 2
subject:
type: string
title: Subject
default: ''
examples:
- 'Designer Output Data'
x-ui:
widget: input
body:
type: string
title: Email Body
default: "Hello,\n\nAttached is the latest data.\n\nBest,\nDatabricks Workflow"
x-ui:
widget: textarea
rows: 6
required:
- sender_email
- secret_scope
- secret_key
- recipients
- subject
additionalProperties: false
ports:
input:
- name: data
title: Input Data
mime: application/vnd.databricks.dataframe
output:
- name: data
title: Output Data
mime: application/vnd.databricks.dataframe
run_function:
type: inline
code: |
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]
if config.get("is_preview", False):
return {"data": input_df}
import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")
if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")
recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)
pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)
for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)
if os.path.exists(file_path):
os.remove(file_path)
return {"data": input_df}
Étape 5 : Enregistrer et inscrire l’opérateur
Enregistrez le fichier YAML dans votre espace de travail Azure Databricks. Par exemple:
/Workspace/Users/<user-name>/gmail_email_sender.yamlAjoutez l’opérateur au fichier
.user_defined_operators.yaml:operators: - /Workspace/Users/<user-name>/gmail_email_sender.yaml
Pour plus d’informations sur les options d’inscription, consultez Rendre votre opérateur détectable.
Permissions
Les utilisateurs qui exécutent un flux de travail contenant cet opérateur ont besoin READ d’accéder à l’étendue secrète, ou ils peuvent fournir leur propre étendue secrète et leurs propres valeurs de clé dans la configuration de l’opérateur. Les utilisateurs ont également besoin d’un accès en lecture au fichier YAML dans l’espace de travail.
Pour accorder l’accès au périmètre secret :
databricks secrets put-acl my_email_scope <user-or-group> READ
Utilisation de l’opérateur dans Lakeflow Designer
Après l’inscription, l’opérateur apparaît dans Lakeflow Designer avec un port d’entrée pour vos champs de source de données et de configuration pour l’e-mail de l’expéditeur, l’étendue secrète, la clé secrète, les destinataires, l’objet et le corps.
Lorsque le flux de travail s’exécute, l’opérateur convertit le DataFrame d’entrée au format CSV, l’attache à un e-mail et l’envoie à chaque destinataire. Le DataFrame est transmis sans modification vers le port de sortie, ce qui vous permet de chaîner des opérateurs supplémentaires en aval. Pendant la préversion du flux de travail, aucun e-mail n’est envoyé.