Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Это важно
Регистрация определяемых пользователем табличных функций Python в Unity Catalog доступна в стадии общественного предварительного просмотра.
Определяемая пользователем функция таблицы каталога Unity регистрирует функции, возвращающие полные таблицы вместо скалярных значений. В отличие от скалярных функций, возвращающих одно результирующее значение из каждого вызова, функции, определённые пользователем, вызываются в запросе SQL FROM и могут возвращать несколько строк и столбцов.
UDTF особенно полезны для:
- Преобразование массивов или сложных структур данных в несколько строк
- Интеграция внешних API или служб в рабочие процессы SQL
- Реализация пользовательской логики создания или обогащения данных
- Обработка данных, требующих операций с отслеживанием состояния между строками
Каждый вызов UDTF принимает ноль или больше аргументов. Эти аргументы могут быть скалярными выражениями или аргументами таблицы, представляющими всю входную таблицу.
Пользовательские табличные функции (UDTF) можно зарегистрировать двумя способами:
- Каталог Unity: зарегистрируйте UDTF в качестве управляемого объекта в каталоге Unity.
- Область действия сеанса: регистрация в локальном
SparkSession, изолированном для текущего ноутбука или задачи. См. сведения о определяемых пользователем функциях таблиц Python (UDTFs).
Требования
UDTF каталога Unity для Python поддерживаются в следующих типах вычислений:
- Бессерверные записные книжки и задания
- Классические вычисления с стандартным режимом доступа (Databricks Runtime 17.1 и более поздних версий)
- Хранилище SQL (бессерверный или профессиональный)
Создание UDTF в каталоге Unity
Используйте DDL SQL для создания управляемого UDTF в каталоге Unity. Определяемые пользователем функции вызываются с помощью предложения инструкции FROM SQL.
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;
SELECT * FROM square_numbers(1, 5);
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+
Azure Databricks реализует определяемые пользователем табличные функции (UDTFs) Python в качестве классов Python с обязательным eval методом, который возвращает выходные строки.
Аргументы таблицы
Замечание
TABLE аргументы поддерживаются в Databricks Runtime 17.2 и выше.
Пользовательские функции работы с таблицами могут принимать целые таблицы в качестве входных аргументов, что позволяет выполнять сложные преобразования и агрегирование с отслеживанием состояния.
eval() и terminate() методы жизненного цикла
Аргументы таблицы в определяемых пользователем табличных функциях используют следующие функции для обработки каждой строки.
-
eval(): вызывается один раз для каждой строки в входной таблице. Это основной метод обработки и является обязательным. -
terminate(): вызывается один раз в конце каждого раздела, после того как все строки были обработаныeval(). Используйте этот метод для получения окончательных статистических результатов или выполнения операций очистки. Этот метод является необязательным, но важным для операций с сохранением состояния, таких как агрегации, подсчет или пакетная обработка.
Дополнительные сведения о eval() и terminate() методах см. в документации по Apache Spark: Python UDTF.
Шаблоны доступа к строкам
eval() получает строки из аргументов TABLE как объекты pyspark.sql.Row. Доступ к значениям можно получить по имени столбца (row['id'], row['name']) или по индексу (row[0], row[1]).
-
Гибкость схемы: объявление TABLE аргументов без определений схемы (например,
data TABLE,t TABLE). Функция принимает любую структуру таблицы, поэтому код должен проверить наличие обязательных столбцов.
См. пример: сопоставление IP-адресов с сетевыми блоками CIDR и пример: пакетное создание подписей к изображениям с использованием визуальных конечных точек Azure Databricks.
Изоляция среды
Замечание
Для общих сред изоляции требуется Databricks Runtime 17.2 и более поздних версий. В более ранних версиях все UDTF каталога Unity выполняются в строгом режиме изоляции.
UDTF каталога Unity с тем же владельцем и сеансом по умолчанию может совместно использовать среду изоляции. Это повышает производительность и уменьшает использование памяти, уменьшая количество отдельных сред, которые необходимо запустить.
Строгая изоляция
Чтобы убедиться, что UDTF всегда выполняется в собственной, полностью изолированной среде, добавьте STRICT ISOLATION предложение характеристики.
Большинству определяемых пользователем типов не требуется строгая изоляция. Стандартные определяемые пользователем функции обработки данных используют общую среду изоляции по умолчанию и выполняются быстрее с меньшим потреблением памяти.
Добавьте предложение характеристики в определяемые STRICT ISOLATION пользователем функции:
- Выполнение входных данных в качестве кода с помощью
eval()exec()функций или аналогичных функций. - Запись файлов в локальную файловую систему.
- Изменение глобальных переменных или состояния системы.
- Доступ или изменение переменных среды.
Следующий пример UDTF задает пользовательскую переменную среды, считывает переменную обратно и умножает набор чисел с помощью переменной. Так как UDTF мутирует среду процесса, запустите ее в STRICT ISOLATION. В противном случае может возникнуть утечка или переопределение переменных среды для других определяемых пользователем пользователей или определяемых пользователем объектов в той же среде, что приводит к неправильному поведению.
CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os
class Multiplier:
def eval(self, factor: str):
# Save the factor as an environment variable
os.environ["FACTOR"] = factor
# Read it back and convert it to a number
scale = int(os.getenv("FACTOR", "1"))
# Multiply 0 through 4 by the factor
for i in range(5):
yield (i, i * scale)
$$;
SELECT * FROM multiply_numbers("3");
Задать, DETERMINISTIC если функция создает согласованные результаты
Добавьте DETERMINISTIC в определение функции, если он создает те же выходные данные для одних и того же входных данных. Это позволяет оптимизировать запросы для повышения производительности.
По умолчанию пользовательские Python UDTF каталога Batch Unity считаются недетерминированными, если явно не объявлены. Примеры недетерминированных функций: создание случайных значений, доступ к текущим времени или датам или вызовы внешних API.
См.
Практические примеры
В следующих примерах показаны реальные случаи использования определяемых пользователем табличных функций (UDTF) Python в Unity Catalog, демонстрирующие переход от простых преобразований данных к сложным внешним интеграциям.
Пример: повторная реализация explode
Хотя Spark предоставляет встроенную explode функцию, создание собственной версии демонстрирует базовый шаблон UDTF для принятия одного входного и создания нескольких выходных строк.
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;
Используйте функцию непосредственно в SQL-запросе:
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
| element |
+---------+
| apple |
| banana |
| cherry |
+---------+
Или примените его к существующим данным таблицы с соединениемLATERAL:
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;
Пример: геолокация IP-адресов через REST API
В этом примере демонстрируется, как пользовательские табличные функции (UDTFs) могут интегрировать внешние API непосредственно в рабочий процесс SQL. Аналитики могут дополнять данные вызовами API в режиме реального времени с помощью знакомого синтаксиса SQL, не требуя отдельных процессов ETL.
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Return nothing if the API request fails
return
$$;
Замечание
UDTFs Python разрешает сетевой трафик TCP/UDP через порты 80, 443 и 53 при использовании бессерверных вычислений или вычислений, настроенных в стандартном режиме доступа.
Используйте функцию для обогащения данных веб-журнала географическими сведениями:
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;
Этот подход позволяет выполнять географический анализ в режиме реального времени, не требуя предварительно обработанных таблиц поиска или отдельных потоков данных. UDTF обрабатывает HTTP-запросы, анализ JSON и обработку ошибок, что делает внешние источники данных доступными через стандартные запросы SQL.
Пример. Сопоставление IP-адресов с сетевыми блоками CIDR
В этом примере показано сопоставление IP-адресов с сетевыми блоками CIDR, общая задача проектирования данных, требующая сложной логики SQL.
Сначала создайте примеры данных с IPv4 и IPv6-адресами:
-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
('log1', '192.168.1.100'),
('log2', '10.0.0.5'),
('log3', '172.16.0.10'),
('log4', '8.8.8.8'),
('log5', '2001:db8::1'),
('log6', '2001:db8:85a3::8a2e:370:7334'),
('log7', 'fe80::1'),
('log8', '::1'),
('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);
Затем определите и зарегистрируйте UDTF. Обратите внимание на структуру классов Python:
- Параметр
t TABLEпринимает входную таблицу с любой схемой. UDTF автоматически адаптируется к обработке любых столбцов. Эта гибкость означает, что вы можете использовать одну и ту же функцию в разных таблицах без изменения сигнатуры функции. Однако необходимо тщательно проверить схему строк, чтобы обеспечить совместимость. - Этот
__init__метод используется для интенсивной одноразовой настройки, например загрузки большого списка сети. Эта работа выполняется один раз для каждого раздела входной таблицы. - Метод
evalобрабатывает каждую строку и содержит основную логику сопоставления. Этот метод выполняется ровно один раз для каждойIpMatcherстроки во входной секции, и каждое выполнение выполняется соответствующим экземпляром класса UDTF для этой секции. - Предложение
HANDLERуказывает имя класса Python, реализующего логику UDTF.
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
def __init__(self):
import ipaddress
# Heavy initialization - load networks once per partition
self.nets = []
cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
'2001:db8::/32', 'fe80::/10', '::1/128']
for cidr in cidrs:
self.nets.append(ipaddress.ip_network(cidr))
def eval(self, row):
import ipaddress
# Validate that required fields exist
required_fields = ['log_id', 'ip_address']
for field in required_fields:
if field not in row:
raise ValueError(f"Missing required field: {field}")
try:
ip = ipaddress.ip_address(row['ip_address'])
for net in self.nets:
if ip in net:
yield (row['log_id'], row['ip_address'], str(net), ip.version)
return
yield (row['log_id'], row['ip_address'], None, ip.version)
except ValueError:
yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;
Теперь, когда ip_cidr_matcher зарегистрирован в каталоге Unity, его можно вызывать напрямую из SQL, используя синтаксис TABLE().
-- Process all IP addresses
SELECT
*
FROM
ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
log_id;
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address | network | ip_version |
+--------+-------------------------------+-----------------+-------------+
| log1 | 192.168.1.100 | 192.168.0.0/16 | 4 |
| log2 | 10.0.0.5 | 10.0.0.0/8 | 4 |
| log3 | 172.16.0.10 | 172.16.0.0/12 | 4 |
| log4 | 8.8.8.8 | null | 4 |
| log5 | 2001:db8::1 | 2001:db8::/32 | 6 |
| log6 | 2001:db8:85a3::8a2e:370:7334 | 2001:db8::/32 | 6 |
| log7 | fe80::1 | fe80::/10 | 6 |
| log8 | ::1 | ::1/128 | 6 |
| log9 | 2001:db8:1234:5678::1 | 2001:db8::/32 | 6 |
+--------+-------------------------------+-----------------+-------------+
Пример: пакетное создание подписей к изображениям с использованием конечных точек обработки изображений Azure Databricks
В этом примере демонстрируется пакетное подписывание изображений с использованием конечной точки обслуживания модели распознавания изображений Azure Databricks. Он демонстрирует использование terminate() для пакетной обработки и выполнения на основе разделов.
Создайте таблицу с URL-адресами общедоступных образов:
CREATE OR REPLACE TEMPORARY VIEW sample_images AS VALUES ('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery') images(image_url, category);Создайте Python UDTF для каталога Unity, чтобы генерировать подписи к изображениям.
- Инициализируйте UDTF с использованием конфигурации, включая размер пакета, токен API Azure Databricks, эндпоинт модели компьютерного зрения и URL рабочей области.
- В методе
evalсоберите URL-адреса изображения в буфер. Когда буфер достигает размера пакета, активируйте пакетную обработку. Это гарантирует, что в одном вызове API обрабатываются несколько образов, а не отдельные вызовы на изображение. - В методе пакетной обработки скачайте все буферные изображения, закодируйте их как base64 и отправьте их в один запрос API в Databricks VisionModel. Модель обрабатывает все изображения одновременно и возвращает подписи для всего пакета.
- Метод
terminateвыполняется ровно один раз в конце каждой секции. В методе завершения работы обработайте все оставшиеся изображения в буфере и выдайте все собранные подписи в виде результатов.
Замечание
Замените <workspace-url> фактическим URL-адресом рабочей области Azure Databricks (https://your-workspace.cloud.databricks.com).
CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
def __init__(self):
self.batch_size = 3
self.vision_endpoint = "databricks-claude-sonnet-4-5"
self.workspace_url = "<workspace-url>"
self.image_buffer = []
self.results = []
def eval(self, row, api_token):
self.image_buffer.append((str(row[0]), api_token))
if len(self.image_buffer) >= self.batch_size:
self._process_batch()
def terminate(self):
if self.image_buffer:
self._process_batch()
for caption in self.results:
yield (caption,)
def _process_batch(self):
batch_data = self.image_buffer.copy()
self.image_buffer.clear()
import base64
import httpx
import requests
# API request timeout in seconds
api_timeout = 60
# Maximum tokens for vision model response
max_response_tokens = 300
# Temperature controls randomness (lower = more deterministic)
model_temperature = 0.3
# create a batch for the images
batch_images = []
api_token = batch_data[0][1] if batch_data else None
for image_url, _ in batch_data:
image_response = httpx.get(image_url, timeout=15)
image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
batch_images.append(image_data)
content_items = [{
"type": "text",
"text": "Provide brief captions for these images, one per line."
}]
for img_data in batch_images:
content_items.append({
"type": "image_url",
"image_url": {
"url": "data:image/jpeg;base64," + img_data
}
})
payload = {
"messages": [{
"role": "user",
"content": content_items
}],
"max_tokens": max_response_tokens,
"temperature": model_temperature
}
response = requests.post(
self.workspace_url + "/serving-endpoints/" +
self.vision_endpoint + "/invocations",
headers={
'Authorization': 'Bearer ' + api_token,
'Content-Type': 'application/json'
},
json=payload,
timeout=api_timeout
)
result = response.json()
batch_response = result['choices'][0]['message']['content'].strip()
lines = batch_response.split('\n')
captions = [line.strip() for line in lines if line.strip()]
while len(captions) < len(batch_data):
captions.append(batch_response)
self.results.extend(captions[:len(batch_data)])
$$;
Чтобы использовать UDTF-заголовка пакетного образа, вызовите его с помощью таблицы образов примера:
Замечание
Замените your_secret_scope и api_token на фактические секретную область и имя ключа для токена API Databricks.
SELECT
caption
FROM
batch_inference_image_caption(
data => TABLE(sample_images),
api_token => secret('your_secret_scope', 'api_token')
)
+---------------------------------------------------------------------------------------------------------------+
| caption |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies |
| Black ant in detailed macro photography standing on a textured surface |
| Tabby cat lounging comfortably on a white ledge against a white wall |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+
Вы также можете создать категории подписей изображений по категориям:
SELECT
*
FROM
batch_inference_image_caption(
TABLE(sample_images)
PARTITION BY category ORDER BY (category),
secret('your_secret_scope', 'api_token')
)
+------------------------------------------------------------------------------------------------------+
| caption |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space. |
| Tabby cat lounging comfortably on white ledge against white wall |
| Wooden boardwalk cutting through lush wetland grasses under blue skies |
+------------------------------------------------------------------------------------------------------+
Пример: кривая ROC и вычисление AUC для оценки модели машинного обучения
В этом примере показаны кривые операционных характеристик приемника вычислений (ROC) и области под кривой (AUC) для оценки модели двоичной классификации с помощью scikit-learn.
В этом примере демонстрируется несколько важных шаблонов:
- Использование внешней библиотеки: интегрирует scikit-learn для вычисления кривой ROC
- Агрегирование с отслеживанием состояния: накапливает прогнозы во всех строках перед вычислением метрик
-
terminate()Использование метода: обрабатывает полный набор данных и дает результаты только после оценки всех строк. - Обработка ошибок. Проверка необходимых столбцов в входной таблице
UDTF накапливает все прогнозы в памяти с помощью eval() метода, а затем вычисляет и дает полную кривую ROC в методе terminate() . Этот шаблон полезен для метрик, требующих полного набора данных для вычисления.
CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
def __init__(self):
from sklearn import metrics
self._roc_curve = metrics.roc_curve
self._roc_auc_score = metrics.roc_auc_score
self._true_labels = []
self._predicted_scores = []
def eval(self, row):
if 'y_true' not in row or 'y_score' not in row:
raise KeyError("Required columns 'y_true' and 'y_score' not found")
true_label = row['y_true']
predicted_score = row['y_score']
label = float(true_label)
self._true_labels.append(label)
self._predicted_scores.append(float(predicted_score))
def terminate(self):
false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
self._true_labels,
self._predicted_scores,
drop_intermediate=False
)
auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))
for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
yield float(threshold), float(tpr), float(fpr), auc_score
$$;
Создание примеров данных двоичной классификации с помощью прогнозов:
CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
( 1, 1.0, 0.95, 'high_confidence_positive'),
( 2, 1.0, 0.87, 'high_confidence_positive'),
( 3, 1.0, 0.82, 'medium_confidence_positive'),
( 4, 0.0, 0.78, 'false_positive'),
( 5, 1.0, 0.71, 'medium_confidence_positive'),
( 6, 0.0, 0.65, 'false_positive'),
( 7, 0.0, 0.58, 'true_negative'),
( 8, 1.0, 0.52, 'low_confidence_positive'),
( 9, 0.0, 0.45, 'true_negative'),
(10, 0.0, 0.38, 'true_negative'),
(11, 1.0, 0.31, 'low_confidence_positive'),
(12, 0.0, 0.15, 'true_negative'),
(13, 0.0, 0.08, 'high_confidence_negative'),
(14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);
Вычислить кривую ROC и AUC:
SELECT
threshold,
true_positive_rate,
false_positive_rate,
auc
FROM compute_roc_curve(
TABLE(
SELECT y_true, y_score
FROM binary_classification_data
WHERE y_true IS NOT NULL AND y_score IS NOT NULL
ORDER BY sample_id
)
)
ORDER BY threshold DESC;
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate | false_positive_rate | auc |
+-----------+---------------------+----------------------+-------+
| 1.95 | 0.0 | 0.0 | 0.786 |
| 0.95 | 0.167 | 0.0 | 0.786 |
| 0.87 | 0.333 | 0.0 | 0.786 |
| 0.82 | 0.5 | 0.0 | 0.786 |
| 0.78 | 0.5 | 0.125 | 0.786 |
| 0.71 | 0.667 | 0.125 | 0.786 |
| 0.65 | 0.667 | 0.25 | 0.786 |
| 0.58 | 0.667 | 0.375 | 0.786 |
| 0.52 | 0.833 | 0.375 | 0.786 |
| 0.45 | 0.833 | 0.5 | 0.786 |
| 0.38 | 0.833 | 0.625 | 0.786 |
| 0.31 | 1.0 | 0.625 | 0.786 |
| 0.15 | 1.0 | 0.75 | 0.786 |
| 0.08 | 1.0 | 0.875 | 0.786 |
| 0.03 | 1.0 | 1.0 | 0.786 |
+-----------+---------------------+----------------------+-------+
Ограничения
Следующие ограничения применяются к Python UDTF в каталоге Unity.
- Полиморфные функции таблицы не поддерживаются.
- Учетные данные службы каталога Unity не поддерживаются.
- Пользовательские зависимости не поддерживаются.