Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Important
Эта функция доступна в общедоступной предварительной версии.
В этом руководстве пошагово показано, как создать оператор python-run-function для Lakeflow Designer, который отправляет содержимое DataFrame через Gmail в виде CSV-вложения. Используйте этот пример, чтобы узнать, как создавать операторы на основе YAML, которые выполняют побочные эффекты, такие как отправка уведомлений или запись во внешние системы. Дополнительные сведения см. в разделе "Определяемые пользователем операторы" в конструкторе Lakeflow.
Requirements
- Рабочая область Azure Databricks с доступом к созданию областей секретов.
- Учетная запись Gmail с паролем Google App (требуется, если включена многофакторная проверка подлинности( MFA).
- Интерфейс командной строки Databricks, установленный в локальной среде разработки.
Шаг 1. Настройка секретов
Сохраните учетные данные Gmail в области секрета Azure Databricks, чтобы оператор смог получить их во время выполнения.
Создайте область секрета с помощью интерфейса командной строки Azure Databricks:
databricks secrets create-scope my_email_scopeСохраните пароль приложения Gmail в этой области видимости:
databricks secrets put-secret my_email_scope gmail_app_passwordВам будет предложено ввести значение секрета. Вставьте пароль приложения Gmail и сохраните его.
Шаг 2: Напишите функцию run()
Для python-run-function типа оператора требуется функция с этой сигнатурой run() :
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
-
config: значения конфигурации, предоставленные пользователем в пользовательском интерфейсе конструктора Lakeflow. -
inputs: Входные фреймы данных, где ключом является имя порта. -
spark: активный сеанс Spark.
Функция должна возвращать словарь выходных объектов DataFrame, где ключами являются имена выходных портов.
Определите и проверьте функцию в ячейке записной книжки:
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]
# Skip side effects during Designer preview
if config.get("is_preview", False):
return {"data": input_df}
import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")
if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")
recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")
# Retrieve password from Databricks secrets
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)
# Convert DataFrame to CSV
pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)
# Send email to each recipient
for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)
# Clean up temp file
if os.path.exists(file_path):
os.remove(file_path)
return {"data": input_df}
Шаг 3. Проверка функции
Проверьте функцию с помощью примера кадра данных:
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)
# Test in preview mode (no email sent)
result = run(
config={
"is_preview": True,
"sender_email": "you@gmail.com",
"secret_scope": "my_email_scope",
"secret_key": "gmail_app_password",
"recipients": "alice@example.com",
"subject": "Test",
"body": "Test body"
},
inputs={"data": test_df},
spark=spark
)
result["data"].show()
# Expected: the original DataFrame, unchanged
Note
Значения secret_scope и secret_key в конфигурации — это имена области действия секрета и ключа, созданных на шаге 1, а не сам пароль. Оператор использует эти имена для извлечения пароля из секретов Azure Databricks во время выполнения.
Important
Сначала выполните проверку, установив is_preview в значение True, чтобы проверить поведение сквозной передачи, не отправляя никаких электронных писем. Когда вы будете готовы протестировать фактическое сообщение электронной почты, задайте для этого значения is_previewFalse.
Шаг 4. Создание определения YAML
Создайте файл с именем gmail_email_sender.yaml со следующим содержимым:
schema: user-defined-operator-v0.1.0
id: gmail_email_sender
type: python-run-function
version: '1.0.0'
name: Gmail Email Sender
description: Sends the input DataFrame as a CSV attachment via Gmail SMTP to one or more recipients.
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
sender_email:
type: string
title: Sender Email
default: ''
examples:
- 'you@gmail.com'
x-ui:
widget: input
secret_scope:
type: string
title: Secret Scope
default: ''
examples:
- 'my_email_scope'
x-ui:
widget: input
secret_key:
type: string
title: Secret Key
default: ''
examples:
- 'gmail_app_password'
x-ui:
widget: input
recipients:
type: string
title: Recipients
default: ''
examples:
- 'alice@example.com, bob@example.com'
x-ui:
widget: textarea
rows: 2
subject:
type: string
title: Subject
default: ''
examples:
- 'Designer Output Data'
x-ui:
widget: input
body:
type: string
title: Email Body
default: "Hello,\n\nAttached is the latest data.\n\nBest,\nDatabricks Workflow"
x-ui:
widget: textarea
rows: 6
required:
- sender_email
- secret_scope
- secret_key
- recipients
- subject
additionalProperties: false
ports:
input:
- name: data
title: Input Data
mime: application/vnd.databricks.dataframe
output:
- name: data
title: Output Data
mime: application/vnd.databricks.dataframe
run_function:
type: inline
code: |
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]
if config.get("is_preview", False):
return {"data": input_df}
import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")
if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")
recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)
pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)
for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)
if os.path.exists(file_path):
os.remove(file_path)
return {"data": input_df}
Шаг 5. Сохранение и регистрация оператора
Сохраните ФАЙЛ YAML в рабочей области Azure Databricks. Рассмотрим пример.
/Workspace/Users/<user-name>/gmail_email_sender.yamlДобавьте оператор в свой файл
.user_defined_operators.yaml:operators: - /Workspace/Users/<user-name>/gmail_email_sender.yaml
Дополнительные сведения о вариантах регистрации см. в разделе Сделать оператор доступным для поиска.
Permissions
Пользователям, выполняющим рабочий процесс, содержащий этот оператор, требуется READ доступ к области секрета или они могут предоставлять собственные значения области секрета и ключи в конфигурации оператора. Пользователям также нужен доступ на чтение к файлу YAML в рабочей области.
Чтобы предоставить доступ к области секрета, выполните следующие действия.
databricks secrets put-acl my_email_scope <user-or-group> READ
Использование оператора в конструкторе Lakeflow
После регистрации оператор появится в Конструкторе Lakeflow с входным портом для вашего источника данных и полями конфигурации для адреса электронной почты отправителя, области секретов, секретного ключа, получателей, темы и текста сообщения.
При запуске рабочего процесса оператор преобразует входной кадр данных в CSV-файл, присоединяет его к электронной почте и отправляет его каждому получателю. DataFrame передаётся на выходной порт без изменений, поэтому можно добавить далее в цепочку дополнительные операторы. Во время предварительной версии рабочего процесса электронная почта не отправляется.