Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ważne
Rejestrowanie funkcji UDTF języka Python w Unity Catalog znajduje się w publicznej wersji zapoznawczej.
Funkcja tabeli zdefiniowana przez użytkownika (UDTF) w katalogu Unity rejestruje funkcje zwracające pełne tabele zamiast wartości skalarnych. W przeciwieństwie do funkcji skalarnych, które zwracają pojedynczą wartość wyniku z każdego wywołania, funkcje UDF są wywoływane w klauzuli FROM instrukcji SQL i mogą zwracać wiele wierszy i kolumn.
Funkcje tabel zdefiniowane przez użytkownika (UDTF) są szczególnie przydatne do:
- Przekształcanie tablic lub złożonych struktur danych w wiele wierszy
- Integrowanie zewnętrznych interfejsów API lub usług z przepływami pracy SQL
- Implementowanie niestandardowej logiki generowania lub wzbogacania danych
- Przetwarzanie danych wymagających operacji stanowych w wierszach
Każde wywołanie UDTF akceptuje zero lub więcej argumentów. Te argumenty mogą być wyrażeniami skalarnych lub argumentami tabeli reprezentującymi całe tabele wejściowe.
Funkcje zdefiniowane przez użytkownika można zarejestrować na dwa sposoby:
- Unity Catalog: zarejestruj UDTF jako obiekt zarządzany w Unity Catalog.
- Zakres sesji: zarejestruj się w lokalnym
SparkSession, odizolowanym od bieżącego notesu lub zadania. Zobacz Funkcje tabeli zdefiniowane przez użytkownika w języku Python (UDTFs).
Requirements
Funkcje tabel zdefiniowane przez użytkownika w języku Python w katalogu Unity są obsługiwane na następujących typach środowisk obliczeniowych:
- Notesy i zadania bezserwerowe
- Klasyczne obliczenia ze standardowym trybem dostępu (Databricks Runtime 17.1 lub nowszy)
- SQL Warehouse (bezserwerowe lub pro)
Utwórz funkcję UDTF w katalogu Unity
Użyj SQL DDL, aby utworzyć zarządzaną funkcję UDTF w Unity Catalog. Funkcje tabelaryczne zdefiniowane przez użytkownika (UDTF) są wywoływane przy użyciu klauzuli w instrukcji FROM SQL.
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;
SELECT * FROM square_numbers(1, 5);
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+
Usługa Azure Databricks implementuje funkcje UDTF języka Python jako klasy języka Python z obowiązkową eval metodą, która generuje wiersze wyjściowe.
Argumenty tabeli
Uwaga / Notatka
TABLE Argumenty są obsługiwane w środowisku Databricks Runtime 17.2 lub nowszym.
Funkcje tabel zdefiniowane przez użytkownika (UDTF) mogą akceptować całe tabele jako argumenty wejściowe, umożliwiając złożone transformacje zachowujące stan i agregacje.
eval() i terminate() metody cyklu życia
Argumenty tabeli w funkcjach UDF używają następujących funkcji do przetwarzania każdego wiersza:
-
eval(): wywoływany raz dla każdego wiersza w tabeli wejściowej. Jest to główna metoda przetwarzania i jest wymagana. -
terminate(): wywoływany raz na końcu każdej partycji, po przetworzeniu wszystkich wierszy przezeval()element. Użyj tej metody, aby uzyskać końcowe zagregowane wyniki lub wykonać operacje oczyszczania. Ta metoda jest opcjonalna, ale niezbędna dla operacji stanowych, takich jak agregacje, zliczanie lub przetwarzanie wsadowe.
Aby uzyskać więcej informacji na temat eval() metod i terminate() metod, zobacz dokumentację platformy Apache Spark: Python UDTF.
Wzorce dostępu do wierszy
eval() odbiera wiersze z TABLE argumentów jako obiekty pyspark.sql.Row . Możesz uzyskać dostęp do wartości według nazwy kolumny (row['id'], row['name']) lub indeksu (row[0], row[1]).
-
Elastyczność schematu: deklarowanie TABLE argumentów bez definicji schematu (na przykład
data TABLE,t TABLE). Funkcja akceptuje dowolną strukturę tabeli, więc kod powinien sprawdzić, czy istnieją wymagane kolumny.
Zobacz Przykład: Dopasowywanie adresów IP do bloków sieciowych CIDR i Przykład: grupowe opisywanie obrazów za pomocą punktów końcowych przetwarzania obrazu usługi Azure Databricks.
Obliczanie dynamicznego schematu wyjściowego (wielomorficzne funkcje UDTF)
Uwaga / Notatka
Polymorficzne UC UDTFs wymagają środowiska Databricks Runtime 18.1 lub nowszego.
Wielomorficzny UDTF określa schemat danych wyjściowych dynamicznie w czasie zapytania przy użyciu metody statycznej analyze(), zamiast deklarować kolumny z góry. Aby go utworzyć, użyj RETURNS TABLE bez definicji kolumn i zdefiniuj metodę analyze() w klasie obsługi.
Poniższy przykład wyodrębnia pola określone przez obiekt wywołujący z ciągu JSON, zwracając różne kolumny w zależności od argumentu fields :
CREATE OR REPLACE FUNCTION extract_fields(json_str STRING, fields STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ExtractFields'
AS $$
class ExtractFields:
@staticmethod
def analyze(json_str, fields):
# Build the output schema from the requested field names
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.udtf import AnalyzeResult
col_names = [f.strip() for f in fields.value.split(",")]
return AnalyzeResult(
StructType([StructField(name, StringType()) for name in col_names])
)
def eval(self, json_str: str, fields: str):
# Parse the JSON and yield only the requested fields
import json
data = json.loads(json_str)
col_names = [f.strip() for f in fields.split(",")]
yield tuple(data.get(name) for name in col_names)
$$;
-- Extract the name and city
SELECT * FROM extract_fields(
'{"name": "Alice", "age": 30, "city": "Seattle"}',
'name, city'
);
+-------+---------+
| name | city |
+-------+---------+
| Alice | Seattle |
+-------+---------+
Definiowanie analyze metody
Klasa obsługi musi zawierać metodę @staticmethod o nazwie analyze , która akceptuje te same argumenty co udTF i zwraca AnalyzeResult opisujący schemat wyjściowy. Azure Databricks wywołuje analyze() w czasie planowania zapytania w celu rozwiązania schematu przed wykonaniem funkcji.
Każdy parametr analyze klasy to wystąpienie AnalyzeArgument klasy:
| Pole | Description |
|---|---|
dataType |
Typ argumentu wejściowego jako DataType. W przypadku argumentów tabeli wejściowej, StructType stanowi reprezentację kolumn tabeli. |
value |
Wartość argumentu wejściowego jako Optional[Any].
None Dotyczy to argumentów tabeli lub wyrażeń niestałych. |
isTable |
Czy argument wejściowy jest argumentem tabeli jako .BooleanType |
isConstantExpression |
Czy argument wejściowy jest wyrażeniem składanym stałym jako BooleanType. |
Metoda analyze zwraca wystąpienie AnalyzeResult klasy:
| Pole | Description |
|---|---|
schema |
Schemat tabeli wyników jako StructType. |
withSinglePartition |
Jeśli True jest użyty, wszystkie wiersze wejściowe są wysyłane do tego samego wystąpienia klasy UDTF. |
partitionBy |
Jeśli nie jest puste, partycjonuje wiersze wejściowe według określonych wyrażeń, aby każda unikatowa kombinacja została przetworzona przez oddzielne wystąpienie UDTF. |
orderBy |
Jeśli nie jest puste, określa kolejność wierszy w ramach każdej partycji. |
select |
Jeśli wartość nie jest pusta, określa, które kolumny z argumentu wejściowego TABLE odbiera UDTF. |
Ostrzeżenie
W przypadku polimorficznych wielokrotnych funkcji użytkownika (UDTF) katalogu Unity należy umieścić wszystkie importy wewnątrz treści analyze() metody. Importy najwyższego poziomu nie są dostępne w środowisku piaskownicy Unity Catalog.
Przesuń stan z analyze do eval
Metoda analyze jest uruchamiana raz w czasie planowania zapytań, dzięki czemu można jej użyć do wstępnego przetwarzania argumentów stałych, parsowania konfiguracji lub tworzenia wyszukiwań. Aby przekazać te wyniki do eval, utwórz podklasę @dataclassAnalyzeResult z polami niestandardowymi, zwróć ją z analyze, i zaakceptuj ją w metodzie __init__. To unika powtarzania kosztownych operacji dla każdego wiersza.
Poniższy przykład zamienia kod języka na pełną nazwę języka w analyze i przekazuje ją dalej, by eval mógł oznaczać każdy wiersz bez powtarzania odnośnika:
CREATE OR REPLACE FUNCTION tag_language(t TABLE, lang_code STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'TagLanguage'
AS $$
class TagLanguage:
@staticmethod
def analyze(t, lang_code):
from dataclasses import dataclass
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.udtf import AnalyzeResult
@dataclass
class LangResult(AnalyzeResult):
language: str = ""
# Resolve the language code to a full name once during planning
languages = {"en": "English", "es": "Spanish", "fr": "French", "de": "German"}
return LangResult(
schema=StructType([
StructField("text", StringType()),
StructField("language", StringType())
]),
language=languages.get(lang_code.value, "Unknown")
)
def __init__(self, result):
self._language = result.language
def eval(self, row, lang_code: str):
# Tag each row with the pre-resolved language name
yield (row['text'], self._language)
$$;
SELECT * FROM tag_language(
TABLE(VALUES ('Hola mundo'), ('Buenos días') t(text)),
'es'
);
+-------------+----------+
| text | language |
+-------------+----------+
| Hola mundo | Spanish |
| Buenos días | Spanish |
+-------------+----------+
Aby uzyskać więcej wzorców i szczegółów dotyczących stanu przekazywania, zobacz Przekazywanie stanu do przyszłych eval wywołań.
Określanie partycjonowania z analyze metody
Gdy wielomorficzny UDTF akceptuje argument tabeli, metoda analyze może kontrolować sposób dystrybucji wierszy wejściowych w wystąpieniach UDTF przez ustawienie partitionBy, orderBy, withSinglePartition, i select na AnalyzeResult. Eliminuje to konieczność określania PARTITION BY lub ORDER BY w języku SQL przez osoby wywołujące.
Aby uzyskać pełny interfejs API partycjonowania i przykłady, zobacz jak określić partycjonowanie wierszy wejściowych przy użyciu metody analyze.
Izolacja środowiskowa
Uwaga / Notatka
Współdzielone środowiska izolacyjne wymagają Databricks Runtime 17.2 lub nowszego. We wcześniejszych wersjach wszystkie funkcje UDTF języka Python w Unity Catalog działają w trybie ścisłej izolacji.
Funkcje zdefiniowane przez użytkownika w języku Python w Unity Catalog, które mają tego samego właściciela i sesję, mogą domyślnie współdzielić środowisko izolacji. Zwiększa to wydajność i zmniejsza użycie pamięci przez zmniejszenie liczby oddzielnych środowisk, które należy uruchomić.
Ścisła izolacja
Aby zapewnić, że funkcja UDTF zawsze działa we własnym, w pełni izolowanym środowisku, dodaj klauzulę charakterystyczną STRICT ISOLATION .
Większość UDTF nie wymaga ścisłej izolacji. Standardowe funkcje tabelaryczne definiowane przez użytkownika (UDTFs) korzystają z domyślnego współdzielonego środowiska izolacyjnego i działają szybciej przy niższym zużyciu pamięci.
Dodaj klauzulę charakterystyczną STRICT ISOLATION do UDTF-ów, które:
- Uruchom dane wejściowe jako kod przy użyciu funkcji
eval(),exec()lub podobnych. - Zapisywanie plików w lokalnym systemie plików.
- Modyfikowanie zmiennych globalnych lub stanu systemu.
- Uzyskaj dostęp do zmiennych środowiskowych lub zmodyfikuj je.
Poniższy przykład UDTF ustawia niestandardową zmienną środowiskową, odczytuje tę zmienną i mnoży zestaw liczb przy jej użyciu. Ponieważ funkcja UDTF modyfikuje środowisko procesowe, uruchom ją w STRICT ISOLATION. W przeciwnym razie może to spowodować wyciek lub zastąpienie zmiennych środowiskowych dla innych funkcji zdefiniowanych przez użytkownika (UDF) lub tabel funkcji zdefiniowanych przez użytkownika (UDTF) w tym samym środowisku, co powoduje nieprawidłowe zachowanie.
CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os
class Multiplier:
def eval(self, factor: str):
# Save the factor as an environment variable
os.environ["FACTOR"] = factor
# Read it back and convert it to a number
scale = int(os.getenv("FACTOR", "1"))
# Multiply 0 through 4 by the factor
for i in range(5):
yield (i, i * scale)
$$;
SELECT * FROM multiply_numbers("3");
Ustaw DETERMINISTIC , czy funkcja generuje spójne wyniki
Dodaj DETERMINISTIC do definicji funkcji, jeśli generuje te same dane wyjściowe dla tych samych danych wejściowych. Dzięki temu optymalizacje zapytań mogą zwiększyć wydajność.
Domyślnie przyjmuje się, że funkcje UDTF języka Python w Unity Catalog usługi Batch są niedeterministyczne, chyba że zostanie to jawnie zadeklarowane. Przykłady funkcji niedeterministycznych to: generowanie wartości losowych, uzyskiwanie dostępu do bieżących godzin lub dat lub wykonywanie zewnętrznych wywołań interfejsu API.
Zobacz CREATE FUNCTION (SQL i Python).
Przykłady praktyczne
W poniższych przykładach przedstawiono rzeczywiste przypadki użycia dla funkcji UDTF języka Python w Unity Catalog, które rozwijają się od prostych przekształceń danych do złożonych integracji zewnętrznych.
Przykład: Ponowne implementowanie explode
Chociaż platforma Spark udostępnia wbudowaną explode funkcję, stworzenie własnej wersji demonstruje podstawowy wzorzec UDTF, polegający na przyjmowaniu pojedynczego wejścia i generowaniu wielu wierszy wyjściowych.
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;
Użyj funkcji bezpośrednio w zapytaniu SQL:
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
| element |
+---------+
| apple |
| banana |
| cherry |
+---------+
Możesz też zastosować je do istniejących danych tabeli za pomocą sprzężeniaLATERAL:
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;
Przykład: geolokalizacja adresu IP za pośrednictwem interfejsu API REST
W tym przykładzie pokazano, jak UDTF mogą integrować zewnętrzne interfejsy API bezpośrednio z przepływem pracy SQL. Analitycy mogą wzbogacać dane przy użyciu wywołań interfejsu API w czasie rzeczywistym przy użyciu znanej składni SQL bez konieczności oddzielnych procesów ETL.
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Return nothing if the API request fails
return
$$;
Uwaga / Notatka
Funkcje zdefiniowane przez użytkownika w Pythonie (UDTF) pozwalają na ruch sieciowy TCP/UDP przez porty 80, 443 i 53 podczas korzystania z obliczeń bezserwerowych lub zasobów obliczeniowych skonfigurowanych w standardowym trybie dostępu.
Użyj funkcji , aby wzbogacić dane dziennika internetowego o informacje geograficzne:
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;
Takie podejście umożliwia analizę geograficzną w czasie rzeczywistym bez konieczności użycia wstępnie przetworzonych tablic przeglądowych lub oddzielnych przepływów danych. Funkcja UDTF obsługuje żądania HTTP, analizowanie JSON i obsługę błędów, dzięki czemu zewnętrzne źródła danych są dostępne za pośrednictwem standardowych zapytań SQL.
Przykład: porównanie adresów IP z blokami sieci CIDR
W tym przykładzie pokazano dopasowywanie adresów IP do bloków sieciowych CIDR, co jest typowym zadaniem inżynierii danych, wymagającym złożonej logiki SQL.
Najpierw utwórz przykładowe dane z adresami IPv4 i IPv6:
-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
('log1', '192.168.1.100'),
('log2', '10.0.0.5'),
('log3', '172.16.0.10'),
('log4', '8.8.8.8'),
('log5', '2001:db8::1'),
('log6', '2001:db8:85a3::8a2e:370:7334'),
('log7', 'fe80::1'),
('log8', '::1'),
('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);
Następnie zdefiniuj i zarejestruj UDTF. Zwróć uwagę na strukturę klas języka Python:
- Parametr
t TABLEakceptuje tabelę wejściową z dowolnym schematem. Funkcja UDTF automatycznie dostosowuje się do przetwarzania wszystkich podanych kolumn. Ta elastyczność oznacza, że można użyć tej samej funkcji w różnych tabelach bez modyfikowania podpisu funkcji. Należy jednak dokładnie sprawdzić schemat wierszy, aby zapewnić zgodność. - Metoda
__init__jest używana do rozległej jednorazowej konfiguracji, takiej jak ładowanie dużej listy sieci. Praca ta odbywa się raz na każdą partycję tabeli wejściowej. - Metoda
evalprzetwarza każdy wiersz i zawiera podstawową logikę dopasowania. Ta metoda wykonywana jest dokładnie raz dla każdego wiersza w partycji wejściowej i jest realizowana przez stosowne wystąpienie klasyIpMatcherUDTF dla tej partycji. - Klauzula
HANDLERokreśla nazwę klasy języka Python, która implementuje logikę UDTF.
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
def __init__(self):
import ipaddress
# Heavy initialization - load networks once per partition
self.nets = []
cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
'2001:db8::/32', 'fe80::/10', '::1/128']
for cidr in cidrs:
self.nets.append(ipaddress.ip_network(cidr))
def eval(self, row):
import ipaddress
# Validate that required fields exist
required_fields = ['log_id', 'ip_address']
for field in required_fields:
if field not in row:
raise ValueError(f"Missing required field: {field}")
try:
ip = ipaddress.ip_address(row['ip_address'])
for net in self.nets:
if ip in net:
yield (row['log_id'], row['ip_address'], str(net), ip.version)
return
yield (row['log_id'], row['ip_address'], None, ip.version)
except ValueError:
yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;
Teraz, gdy ip_cidr_matcher jest zarejestrowany w katalogu Unity, wywołaj go bezpośrednio z SQL przy użyciu składni TABLE().
-- Process all IP addresses
SELECT
*
FROM
ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
log_id;
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address | network | ip_version |
+--------+-------------------------------+-----------------+-------------+
| log1 | 192.168.1.100 | 192.168.0.0/16 | 4 |
| log2 | 10.0.0.5 | 10.0.0.0/8 | 4 |
| log3 | 172.16.0.10 | 172.16.0.0/12 | 4 |
| log4 | 8.8.8.8 | null | 4 |
| log5 | 2001:db8::1 | 2001:db8::/32 | 6 |
| log6 | 2001:db8:85a3::8a2e:370:7334 | 2001:db8::/32 | 6 |
| log7 | fe80::1 | fe80::/10 | 6 |
| log8 | ::1 | ::1/128 | 6 |
| log9 | 2001:db8:1234:5678::1 | 2001:db8::/32 | 6 |
+--------+-------------------------------+-----------------+-------------+
Przykład: transkrybowanie obrazów w usłudze Batch przy użyciu punktów końcowych przetwarzania obrazów usługi Azure Databricks
W tym przykładzie pokazano generowanie podpisów dla obrazów wsadowych przy użyciu modelu wizji usługi Azure Databricks obsługiwanego przez punkt końcowy. Prezentuje użycie terminate() do przetwarzania wsadowego i wykonywania opartego na partycjach.
Utwórz tabelę z publicznymi adresami URL obrazów:
CREATE OR REPLACE TEMPORARY VIEW sample_images AS VALUES ('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery') images(image_url, category);Utwórz Unity Catalog w Pythonie, używając UDTF, aby wygenerować opisy obrazów:
- Zainicjuj UDTF przy użyciu konfiguracji, w tym rozmiaru partii, tokenu API Azure Databricks, punktu końcowego modelu wizyjnego i adresu URL obszaru roboczego.
- W metodzie
evalzbierz adresy URL obrazów do buforu. Gdy bufor osiągnie wielkość partii, uruchom przetwarzanie wsadowe. Dzięki temu wiele obrazów jest przetwarzanych razem w jednym wywołaniu interfejsu API, a nie w poszczególnych wywołaniach obrazu. - W metodzie przetwarzania wsadowego pobierz wszystkie buforowane obrazy, zakoduj je jako base64 i wyślij je do pojedynczego żądania interfejsu API do usługi Databricks VisionModel. Model przetwarza wszystkie obrazy jednocześnie i zwraca podpisy dla całej partii.
- Metoda
terminatejest wykonywana dokładnie raz na końcu każdej partycji. W metodzie terminate przetwarzaj wszystkie pozostałe obrazy w buforze i zwracaj wszystkie zebrane opisy jako wyniki.
Uwaga / Notatka
Zamień <workspace-url> na rzeczywisty adres URL swojego obszaru roboczego usługi Azure Databricks (https://your-workspace.cloud.databricks.com).
CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
def __init__(self):
self.batch_size = 3
self.vision_endpoint = "databricks-claude-sonnet-4-5"
self.workspace_url = "<workspace-url>"
self.image_buffer = []
self.results = []
def eval(self, row, api_token):
self.image_buffer.append((str(row[0]), api_token))
if len(self.image_buffer) >= self.batch_size:
self._process_batch()
def terminate(self):
if self.image_buffer:
self._process_batch()
for caption in self.results:
yield (caption,)
def _process_batch(self):
batch_data = self.image_buffer.copy()
self.image_buffer.clear()
import base64
import httpx
import requests
# API request timeout in seconds
api_timeout = 60
# Maximum tokens for vision model response
max_response_tokens = 300
# Temperature controls randomness (lower = more deterministic)
model_temperature = 0.3
# create a batch for the images
batch_images = []
api_token = batch_data[0][1] if batch_data else None
for image_url, _ in batch_data:
image_response = httpx.get(image_url, timeout=15)
image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
batch_images.append(image_data)
content_items = [{
"type": "text",
"text": "Provide brief captions for these images, one per line."
}]
for img_data in batch_images:
content_items.append({
"type": "image_url",
"image_url": {
"url": "data:image/jpeg;base64," + img_data
}
})
payload = {
"messages": [{
"role": "user",
"content": content_items
}],
"max_tokens": max_response_tokens,
"temperature": model_temperature
}
response = requests.post(
self.workspace_url + "/serving-endpoints/" +
self.vision_endpoint + "/invocations",
headers={
'Authorization': 'Bearer ' + api_token,
'Content-Type': 'application/json'
},
json=payload,
timeout=api_timeout
)
result = response.json()
batch_response = result['choices'][0]['message']['content'].strip()
lines = batch_response.split('\n')
captions = [line.strip() for line in lines if line.strip()]
while len(captions) < len(batch_data):
captions.append(batch_response)
self.results.extend(captions[:len(batch_data)])
$$;
Aby użyć wsadowego podpisywania obrazów UDTF, wywołaj go przy użyciu przykładowej tabeli obrazów.
Uwaga / Notatka
Zastąp your_secret_scope i api_token rzeczywistym zakresem tajnym i nazwą klucza dla tokenu interfejsu API Databricks.
SELECT
caption
FROM
batch_inference_image_caption(
data => TABLE(sample_images),
api_token => secret('your_secret_scope', 'api_token')
)
+---------------------------------------------------------------------------------------------------------------+
| caption |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies |
| Black ant in detailed macro photography standing on a textured surface |
| Tabby cat lounging comfortably on a white ledge against a white wall |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+
Możesz również wygenerować kategorię podpisów obrazów według kategorii:
SELECT
*
FROM
batch_inference_image_caption(
TABLE(sample_images)
PARTITION BY category ORDER BY (category),
secret('your_secret_scope', 'api_token')
)
+------------------------------------------------------------------------------------------------------+
| caption |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space. |
| Tabby cat lounging comfortably on white ledge against white wall |
| Wooden boardwalk cutting through lush wetland grasses under blue skies |
+------------------------------------------------------------------------------------------------------+
Przykład: krzywa ROC i obliczenia AUC na potrzeby oceny modelu uczenia maszynowego
W tym przykładzie pokazano, jak obliczać krzywe ROC i pole pod krzywą (AUC) dla oceny modeli klasyfikacji binarnej przy użyciu biblioteki scikit-learn.
W tym przykładzie przedstawiono kilka ważnych wzorców:
- Użycie biblioteki zewnętrznej: integruje bibliotekę scikit-learn z obliczeniami krzywej ROC
- Agregacja stanowa: gromadzi przewidywania we wszystkich wierszach przed obliczeniami metryk
-
terminate()użycie metody: przetwarza kompletny zestaw danych i daje wyniki dopiero po ocenie wszystkich wierszy - Obsługa błędów: Sprawdza, czy wymagane kolumny istnieją w tabeli wejściowej
Funkcja UDTF gromadzi wszystkie przewidywania w pamięci za pomocą metody eval(), a następnie oblicza i zwraca pełną krzywą ROC przy użyciu metody terminate(). Ten wzorzec jest przydatny w przypadku metryk, które wymagają kompletnego zestawu danych do obliczenia.
CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
def __init__(self):
from sklearn import metrics
self._roc_curve = metrics.roc_curve
self._roc_auc_score = metrics.roc_auc_score
self._true_labels = []
self._predicted_scores = []
def eval(self, row):
if 'y_true' not in row or 'y_score' not in row:
raise KeyError("Required columns 'y_true' and 'y_score' not found")
true_label = row['y_true']
predicted_score = row['y_score']
label = float(true_label)
self._true_labels.append(label)
self._predicted_scores.append(float(predicted_score))
def terminate(self):
false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
self._true_labels,
self._predicted_scores,
drop_intermediate=False
)
auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))
for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
yield float(threshold), float(tpr), float(fpr), auc_score
$$;
Utwórz przykładowe dane klasyfikacji binarnej z przewidywaniami:
CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
( 1, 1.0, 0.95, 'high_confidence_positive'),
( 2, 1.0, 0.87, 'high_confidence_positive'),
( 3, 1.0, 0.82, 'medium_confidence_positive'),
( 4, 0.0, 0.78, 'false_positive'),
( 5, 1.0, 0.71, 'medium_confidence_positive'),
( 6, 0.0, 0.65, 'false_positive'),
( 7, 0.0, 0.58, 'true_negative'),
( 8, 1.0, 0.52, 'low_confidence_positive'),
( 9, 0.0, 0.45, 'true_negative'),
(10, 0.0, 0.38, 'true_negative'),
(11, 1.0, 0.31, 'low_confidence_positive'),
(12, 0.0, 0.15, 'true_negative'),
(13, 0.0, 0.08, 'high_confidence_negative'),
(14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);
Oblicz krzywą ROC i AUC:
SELECT
threshold,
true_positive_rate,
false_positive_rate,
auc
FROM compute_roc_curve(
TABLE(
SELECT y_true, y_score
FROM binary_classification_data
WHERE y_true IS NOT NULL AND y_score IS NOT NULL
ORDER BY sample_id
)
)
ORDER BY threshold DESC;
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate | false_positive_rate | auc |
+-----------+---------------------+----------------------+-------+
| 1.95 | 0.0 | 0.0 | 0.786 |
| 0.95 | 0.167 | 0.0 | 0.786 |
| 0.87 | 0.333 | 0.0 | 0.786 |
| 0.82 | 0.5 | 0.0 | 0.786 |
| 0.78 | 0.5 | 0.125 | 0.786 |
| 0.71 | 0.667 | 0.125 | 0.786 |
| 0.65 | 0.667 | 0.25 | 0.786 |
| 0.58 | 0.667 | 0.375 | 0.786 |
| 0.52 | 0.833 | 0.375 | 0.786 |
| 0.45 | 0.833 | 0.5 | 0.786 |
| 0.38 | 0.833 | 0.625 | 0.786 |
| 0.31 | 1.0 | 0.625 | 0.786 |
| 0.15 | 1.0 | 0.75 | 0.786 |
| 0.08 | 1.0 | 0.875 | 0.786 |
| 0.03 | 1.0 | 1.0 | 0.786 |
+-----------+---------------------+----------------------+-------+
Przykład: dynamiczne projekcje kolumn z argumentu tabeli
Ten przykład łączy wielomorficzne funkcje UDF z argumentami tabeli. Funkcja UDTF akceptuje tabelę i rozdzielaną przecinkami listę nazw kolumn, a następnie projektuje tylko te kolumny z danych wejściowych. Metoda analyze sprawdza schemat tabeli wejściowej i tworzy schemat wyjściowy zawierający tylko żądane kolumny.
CREATE OR REPLACE FUNCTION project_columns(t TABLE, columns STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ProjectColumns'
AS $$
class ProjectColumns:
@staticmethod
def analyze(t, columns):
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeResult
requested = [c.strip() for c in columns.value.split(",")]
input_schema = t.dataType
output_fields = []
for field in input_schema.fields:
if field.name in requested:
output_fields.append(field)
if not output_fields:
raise ValueError(
f"None of the requested columns {requested} "
f"exist in the input table"
)
return AnalyzeResult(schema=StructType(output_fields))
def eval(self, row, columns: str):
requested = [c.strip() for c in columns.split(",")]
yield tuple(row[col] for col in requested if col in row)
$$;
Użyj funkcji , aby wybrać określone kolumny z tabeli:
SELECT * FROM project_columns(
TABLE(SELECT * FROM samples.nyctaxi.trips LIMIT 5),
'pickup_zip, dropoff_zip, fare_amount'
);
Ograniczenia
Następujące ograniczenia dotyczą funkcji UDTF Python w Unity Catalog:
- Poświadczenia usługi Unity Catalog nie są obsługiwane.
- Zależności niestandardowe nie są obsługiwane.