Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Important
Dieses Feature befindet sich in der Public Preview.
Dieses Lernprogramm führt Sie durch das Erstellen eines python-run-function Operators für Lakeflow Designer, der den Inhalt eines DataFrame als CSV-Anlage über Gmail sendet. In diesem Beispiel erfahren Sie, wie Sie YAML-basierte Operatoren erstellen, die Nebenwirkungen ausführen, z. B. das Senden von Benachrichtigungen oder das Schreiben an externe Systeme. Weitere Informationen finden Sie unter Benutzerdefinierte Operatoren in Lakeflow Designer.
Requirements
- Ein Azure Databricks Arbeitsbereich mit Zugriff zum Erstellen geheimer Bereiche.
- Ein Gmail-Konto mit einem Google App-Kennwort (erforderlich, wenn die mehrstufige Authentifizierung (MFA) aktiviert ist).
- Die Databricks CLI wurde auf Ihrem lokalen Entwicklungscomputer installiert.
Schritt 1: Geheimnisse einrichten
Speichern Sie Ihre Gmail-Anmeldeinformationen in einem Azure Databricks geheimen Bereich, damit der Operator sie zur Laufzeit abrufen kann.
Erstellen Sie einen geheimen Bereich mithilfe der Azure Databricks CLI:
databricks secrets create-scope my_email_scopeSpeichern Sie Ihr Gmail-App-Kennwort im Bereich:
databricks secrets put-secret my_email_scope gmail_app_passwordSie werden aufgefordert, den geheimen Wert einzugeben. Fügen Sie Ihr Gmail-App-Kennwort ein, und speichern Sie es.
Schritt 2: Schreiben der run() Funktion
Der python-run-function Operatortyp erfordert eine run() Funktion mit dieser Signatur:
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
-
config: Konfigurationswerte, die vom Benutzer in der Lakeflow Designer-Benutzeroberfläche bereitgestellt werden. -
inputs: Eingabe-DataFrames, indiziert nach Portnamen. -
spark: Die aktive Spark-Sitzung.
Die Funktion muss ein Wörterbuch von Ausgabe-DataFrames zurückgeben, dessen Schlüssel die Namen der Ausgabeports sind.
Definieren und Testen der Funktion in einer Notizbuchzelle:
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]
# Skip side effects during Designer preview
if config.get("is_preview", False):
return {"data": input_df}
import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")
if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")
recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")
# Retrieve password from Databricks secrets
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)
# Convert DataFrame to CSV
pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)
# Send email to each recipient
for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)
# Clean up temp file
if os.path.exists(file_path):
os.remove(file_path)
return {"data": input_df}
Schritt 3: Testen der Funktion
Testen Sie die Funktion mit einem Beispiel-DataFrame:
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)
# Test in preview mode (no email sent)
result = run(
config={
"is_preview": True,
"sender_email": "you@gmail.com",
"secret_scope": "my_email_scope",
"secret_key": "gmail_app_password",
"recipients": "alice@example.com",
"subject": "Test",
"body": "Test body"
},
inputs={"data": test_df},
spark=spark
)
result["data"].show()
# Expected: the original DataFrame, unchanged
Note
Die secret_scope Werte und secret_key Werte in der Konfiguration sind die Namen des geheimen Bereichs und schlüssels, den Sie in Schritt 1 erstellt haben – nicht das tatsächliche Kennwort. Der Operator verwendet diese Namen, um das Kennwort zur Laufzeit aus den Azure Databricks-Secrets abzurufen.
Important
Testen Sie zunächst mit is_preview, das auf True gesetzt ist, um das Pass-Through-Verhalten zu überprüfen, ohne E-Mails zu versenden. Wenn Sie bereit sind, die eigentliche E-Mail zu testen, setzen Sie is_preview auf False.
Schritt 4: Erstellen der YAML-Definition
Erstellen Sie eine Datei mit dem Namen gmail_email_sender.yaml und dem folgenden Inhalt:
schema: user-defined-operator-v0.1.0
id: gmail_email_sender
type: python-run-function
version: '1.0.0'
name: Gmail Email Sender
description: Sends the input DataFrame as a CSV attachment via Gmail SMTP to one or more recipients.
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
sender_email:
type: string
title: Sender Email
default: ''
examples:
- 'you@gmail.com'
x-ui:
widget: input
secret_scope:
type: string
title: Secret Scope
default: ''
examples:
- 'my_email_scope'
x-ui:
widget: input
secret_key:
type: string
title: Secret Key
default: ''
examples:
- 'gmail_app_password'
x-ui:
widget: input
recipients:
type: string
title: Recipients
default: ''
examples:
- 'alice@example.com, bob@example.com'
x-ui:
widget: textarea
rows: 2
subject:
type: string
title: Subject
default: ''
examples:
- 'Designer Output Data'
x-ui:
widget: input
body:
type: string
title: Email Body
default: "Hello,\n\nAttached is the latest data.\n\nBest,\nDatabricks Workflow"
x-ui:
widget: textarea
rows: 6
required:
- sender_email
- secret_scope
- secret_key
- recipients
- subject
additionalProperties: false
ports:
input:
- name: data
title: Input Data
mime: application/vnd.databricks.dataframe
output:
- name: data
title: Output Data
mime: application/vnd.databricks.dataframe
run_function:
type: inline
code: |
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]
if config.get("is_preview", False):
return {"data": input_df}
import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")
if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")
recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)
pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)
for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)
if os.path.exists(file_path):
os.remove(file_path)
return {"data": input_df}
Schritt 5: Speichern und Registrieren des Operators
Speichern Sie die YAML-Datei in Ihrem Azure Databricks Arbeitsbereich. Beispiel:
/Workspace/Users/<user-name>/gmail_email_sender.yamlFügen Sie den Operator Ihrer
.user_defined_operators.yaml-Datei hinzu:operators: - /Workspace/Users/<user-name>/gmail_email_sender.yaml
Weitere Informationen zu Registrierungsoptionen finden Sie unter Make your operator discoverable.
Erlaubnisse
Benutzer, die einen Workflow ausführen, der diesen Operator enthält, benötigen READ Zugriff auf den geheimen Bereich, oder sie können ihren eigenen geheimen Bereich und Schlüsselwerte in der Operatorkonfiguration bereitstellen. Benutzer benötigen auch Lesezugriff auf die YAML-Datei im Arbeitsbereich.
So gewähren Sie Zugriff auf den geheimen Geltungsbereich:
databricks secrets put-acl my_email_scope <user-or-group> READ
Den Operator in Lakeflow Designer verwenden
Nach der Registrierung wird der Operator im Lakeflow Designer mit einem Eingabeport für Ihre Datenquellen- und Konfigurationsfelder für Absender-E-Mails, geheimer Bereich, geheimer Schlüssel, Empfänger, Betreff und Text angezeigt.
Wenn der Workflow ausgeführt wird, konvertiert der Operator den Eingabedatenframe in CSV, fügt ihn an eine E-Mail an und sendet ihn an jeden Empfänger. Der DataFrame wird unverändert an den Ausgabeport weitergeleitet, sodass Sie nachgelagert weitere Operatoren anschließen können. Während der Workflowvorschau wird keine E-Mail gesendet.