Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Důležité
Registrace UDTF Pythonu v katalogu Unity je ve veřejné náhledové verzi.
Uživatelsky definovaná funkce tabulky v Unity Catalog (UDTF) registruje funkce, které místo skalárních hodnot vrací úplné tabulky. Na rozdíl od skalárních funkcí, které vrací jednu výslednou hodnotu z každého volání, se funkce UDTFs vyvolávají v klauzuli FROM příkazu SQL a můžou vracet více řádků a sloupců.
Funkce definované uživatelem jsou užitečné zejména pro:
- Transformace polí nebo složitých datových struktur na více řádků
- Integrace externích rozhraní API nebo služeb do pracovních postupů SQL
- Implementace vlastní logiky generování nebo rozšiřování dat
- Zpracování dat, která vyžadují stavové operace napříč řádky
Každé volání UDTF přijímá nula nebo více argumentů. Tyto argumenty mohou být skalární výrazy nebo argumenty tabulky představující celé vstupní tabulky.
Funkce definované uživatelem je možné zaregistrovat dvěma způsoby:
- Katalog Unity: Zaregistrujte UDTF jako řídicí objekt v katalogu Unity.
- Řízené relací: Zaregistrujte se v rámci místního
SparkSession, izolovaného do aktuálního poznámkového bloku nebo úlohy. Viz uživatelem definované funkce tabulek v Pythonu (UDTFs).
Požadavky
Python UDTF Unity Catalogu jsou podporovány u následujících typů výpočetních prostředků:
- Bezserverové poznámkové bloky a úlohy
- Klasické výpočetní prostředky se standardním režimem přístupu (Databricks Runtime 17.1 a novější)
- SQL Warehouse (bez serveru nebo pro)
Vytvoření UDTF v katalogu Unity
K vytvoření řízeného UDTF v katalogu Unity použijte SQL DDL . UDTF se vyvolávají pomocí klauzule FROM v SQL příkazu.
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;
SELECT * FROM square_numbers(1, 5);
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+
Azure Databricks implementuje uživatelsky definované funkce tabulky v Pythonu jako třídy Pythonu s povinnou metodou eval, která generuje výstupní řádky.
Argumenty tabulky
Poznámka:
TABLE Argumenty jsou podporovány v Databricks Runtime 17.2 a vyšší.
Funkce definované uživatelem pro tabulky (UDTF) mohou přijímat celé tabulky jako vstupní argumenty, což umožňuje provádět komplexní stavové transformace a agregace.
eval() a terminate() metody životního cyklu
Argumenty tabulky ve funkcích definovaných uživatelem používají následující funkce ke zpracování jednotlivých řádků.
-
eval(): Volá se jednou pro každý řádek ve vstupní tabulce. Toto je hlavní metoda zpracování a vyžaduje se. -
terminate(): Volá se jednou na konci každého oddílu, poté, coeval()zpracoval všechny řádky. Tato metoda slouží k dosažení konečných agregovaných výsledků nebo provádění operací čištění. Tato metoda je volitelná, ale nezbytná pro stavové operace, jako jsou agregace, počítání nebo dávkové zpracování.
Další informace o metodách eval() a terminate() najdete v dokumentaci k Apache Sparku: Python UDTF.
Vzory přístupu k řádkům
eval() přijímá jako objekty TABLE řádky z argumentů. K hodnotám můžete přistupovat podle názvu sloupce (row['id'], row['name']) nebo podle indexu (row[0], row[1]).
-
Flexibilita schématu: Deklarujte TABLE argumenty bez definic schématu (například
data TABLE,t TABLE). Funkce přijme libovolnou strukturu tabulky, takže váš kód by měl ověřit, že existují požadované sloupce.
Viz Příklad: Porovnání IP adres se síťovými bloky CIDR a příklad: Titulkování obrázků služby Batch pomocí koncových bodů zpracování obrazu Azure Databricks
Výpočet dynamického výstupního schématu (polymorfní funkce definované uživatelem)
Poznámka:
Polymorfní UC UDTF vyžadují Databricks Runtime 18.1 a vyšší.
Polymorfní UDTF určuje jeho výstupní schéma dynamicky v době dotazu pomocí statické analyze() metody, nikoli deklarování sloupců předem. Chcete-li ji vytvořit, použijte RETURNS TABLE bez definic sloupců a definujte metodu ve třídě obslužné rutiny analyze().
Následující příklad extrahuje pole zadaná volajícím z řetězce JSON a vrací různé sloupce v závislosti na argumentu fields :
CREATE OR REPLACE FUNCTION extract_fields(json_str STRING, fields STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ExtractFields'
AS $$
class ExtractFields:
@staticmethod
def analyze(json_str, fields):
# Build the output schema from the requested field names
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.udtf import AnalyzeResult
col_names = [f.strip() for f in fields.value.split(",")]
return AnalyzeResult(
StructType([StructField(name, StringType()) for name in col_names])
)
def eval(self, json_str: str, fields: str):
# Parse the JSON and yield only the requested fields
import json
data = json.loads(json_str)
col_names = [f.strip() for f in fields.split(",")]
yield tuple(data.get(name) for name in col_names)
$$;
-- Extract the name and city
SELECT * FROM extract_fields(
'{"name": "Alice", "age": 30, "city": "Seattle"}',
'name, city'
);
+-------+---------+
| name | city |
+-------+---------+
| Alice | Seattle |
+-------+---------+
analyze Definování metody
Třída obslužné rutiny musí obsahovat metodu @staticmethod s názvem analyze , která přijímá stejné argumenty jako UDTF a vrací AnalyzeResult popisující výstupní schéma. Azure Databricks volá analyze() při plánování dotazu, aby se schéma vyřešilo před spuštěním funkce.
Každý parametr analyze třídy je instance AnalyzeArgument :
| Pole | Description |
|---|---|
dataType |
Typ vstupního argumentu jako DataType. U argumentů vstupní tabulky se jedná o StructType reprezentaci sloupců tabulky. |
value |
Hodnota vstupního argumentu jako Optional[Any]. Toto je None pro argumenty tabulky nebo ne-konstantní výrazy. |
isTable |
Zda vstupní argument je argument tabulky jako BooleanType. |
isConstantExpression |
Zda vstupní argument je výraz, který lze složit jako konstantu jako BooleanType. |
Metoda analyze vrátí instanci AnalyzeResult třídy:
| Pole | Description |
|---|---|
schema |
Schéma výsledné tabulky jako StructType. |
withSinglePartition |
Pokud True, odešle všechny vstupní řádky do stejné UDTF třídy instance. |
partitionBy |
Pokud nejsou prázdné, rozdělí vstupní řádky zadanými výrazy tak, aby každá jedinečná kombinace byla zpracována samostatnou instancí UDTF. |
orderBy |
Pokud to není prázdné, určuje pořadí řádků v rámci každé oblasti. |
select |
Pokud není prázdné, určuje, které sloupce ze vstupního TABLE argumentu funkce UDTF přijímá. |
Výstraha
Pro polymorfní UDTF v katalogu Unity je nutné umístit všechny importy do těla metody analyze(). Importy nejvyšší úrovně nejsou dostupné v sandboxovém prostředí katalogu Unity.
Přeposlat stav od analyze do eval
Metoda analyze se spouští jednou v době plánování dotazů, takže ji můžete použít k předběžnému zpracování konstantních argumentů, parsování konfigurací nebo sestavení vyhledávání. Pokud chcete tyto výsledky předat eval, vytvořte podtřídu @dataclass třídy AnalyzeResult s vlastními poli, vraťte ji z analyze a přijměte ji v metodě __init__. Tím se zabrání opakování nákladné práce pro každý řádek.
Následující příklad převede kód jazyka na úplný název jazyka jednou v analyze a poté ho předá dál, aby eval mohl označit každý řádek bez nutnosti opakovaného vyhledávání:
CREATE OR REPLACE FUNCTION tag_language(t TABLE, lang_code STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'TagLanguage'
AS $$
class TagLanguage:
@staticmethod
def analyze(t, lang_code):
from dataclasses import dataclass
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.udtf import AnalyzeResult
@dataclass
class LangResult(AnalyzeResult):
language: str = ""
# Resolve the language code to a full name once during planning
languages = {"en": "English", "es": "Spanish", "fr": "French", "de": "German"}
return LangResult(
schema=StructType([
StructField("text", StringType()),
StructField("language", StringType())
]),
language=languages.get(lang_code.value, "Unknown")
)
def __init__(self, result):
self._language = result.language
def eval(self, row, lang_code: str):
# Tag each row with the pre-resolved language name
yield (row['text'], self._language)
$$;
SELECT * FROM tag_language(
TABLE(VALUES ('Hola mundo'), ('Buenos días') t(text)),
'es'
);
+-------------+----------+
| text | language |
+-------------+----------+
| Hola mundo | Spanish |
| Buenos días | Spanish |
+-------------+----------+
Další vzory a podrobnosti o stavu přesměrování najdete v části Přesměrování stavu na budoucí eval volání.
Specifikovat dělení pomocí metody analyze
Když polymorfní UDTF přijímá argument tabulky, analyze metoda může řídit, jak se vstupní řádky distribuují napříč instancemi UDTF nastavením partitionBy, orderBy, withSinglePartition, a select na AnalyzeResult. Tím se eliminuje potřeba, aby volající museli specifikovat PARTITION BY nebo ORDER BY v SQL.
Úplné rozhraní API pro dělení a příklady najdete v tématu Určení dělení vstupních řádků z analyze metody.
Izolace prostředí
Poznámka:
Sdílená prostředí izolace vyžadují Databricks Runtime 17.2 a vyšší. Ve starších verzích se všechny uživatelem definované funkce Pythonu v Unity Catalog spouštějí v přísném režimu izolace.
Uživatelem definované tabulkové funkce Pythonu v rámci Unity Catalogu se stejným vlastníkem a relací mohou ve výchozím nastavení sdílet prostředí izolace. Tím se zlepší výkon a sníží se využití paměti snížením počtu samostatných prostředí, která je potřeba spustit.
Striktní izolace
Pokud chcete zajistit, aby UDTF vždy běžel ve vlastním, plně izolovaném prostředí, přidejte STRICT ISOLATION charakteristické klauzule.
Většina UDTF nepotřebuje striktní izolaci. Funkce UDF pro zpracování standardních dat využívají výchozí prostředí pro sdílenou izolaci a s nižší spotřebou paměti běží rychleji.
STRICT ISOLATION Přidejte charakteristické klauzuli do UDTF, která:
- Spusťte vstup jako kód pomocí funkce
eval(),exec()nebo podobné funkce. - Zapisujte soubory do místního systému souborů.
- Upravte globální proměnné nebo stav systému.
- Přístup k proměnným prostředí nebo jejich úprava
Následující příklad UDTF nastaví vlastní proměnnou prostředí, přečte proměnnou zpět a vynásobí sadu čísel pomocí proměnné. Vzhledem k tomu, že UDTF mění procesní prostředí, spusťte ho v STRICT ISOLATION. Jinak by mohlo dojít k úniku nebo přepsání proměnných prostředí pro jiné uživatelsky definované funkce nebo tabulkové funkce ve stejném prostředí, což způsobuje nesprávné chování.
CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os
class Multiplier:
def eval(self, factor: str):
# Save the factor as an environment variable
os.environ["FACTOR"] = factor
# Read it back and convert it to a number
scale = int(os.getenv("FACTOR", "1"))
# Multiply 0 through 4 by the factor
for i in range(5):
yield (i, i * scale)
$$;
SELECT * FROM multiply_numbers("3");
Nastavení DETERMINISTIC , pokud vaše funkce vytváří konzistentní výsledky
Přidejte DETERMINISTIC do definice funkce, pokud vytváří stejné výstupy pro stejné vstupy. Umožňuje to optimalizaci dotazů ke zlepšení výkonu.
Ve výchozím nastavení se předpokládá, že UDTF Pythonu v Batch Unity Catalog jsou nedeterministické, pokud není explicitně deklarováno jinak. Mezi příklady ne deterministických funkcí patří generování náhodných hodnot, přístup k aktuálním časům nebo kalendářním datům nebo volání externího rozhraní API.
Podívejte se na CREATE FUNCTION (SQL a Python).
Praktické příklady
Následující příklady demonstrují praktické případy použití uživatelských definovaných tabulkových funkcí Pythonu v Unity Catalog, postupně od jednoduchých datových transformací až po komplexní externí integrace.
Příklad: Opětovné implementace explode
I když Spark poskytuje integrovanou explode funkci, vytvoření vlastní verze ukazuje základní vzor UDTF, který přebírá jeden vstup a vytváří více výstupních řádků.
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;
Funkci použijte přímo v dotazu SQL:
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
| element |
+---------+
| apple |
| banana |
| cherry |
+---------+
Nebo použijte u existujících dat tabulky pomocí LATERAL joinu:
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;
Příklad: Geografická poloha IP adresy přes rozhraní REST API
Tento příklad ukazuje, jak mohou UDTF integrovat API přímo do vašeho SQL pracovního postupu. Analytici můžou obohatit data o volání rozhraní API v reálném čase pomocí známé syntaxe SQL, aniž by vyžadovali samostatné procesy ETL.
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Return nothing if the API request fails
return
$$;
Poznámka:
UDF Python umožňují síťový provoz TCP/UDP přes porty 80, 443 a 53 při použití bezserverového počítání nebo výpočetních prostředků nakonfigurovaných pomocí standardního režimu přístupu.
Pomocí funkce můžete rozšířit data webového protokolu o geografické informace:
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;
Tento přístup umožňuje geografickou analýzu v reálném čase bez nutnosti předem zpracovaných vyhledávacích tabulek nebo samostatných datových kanálů. UDTF zpracovává požadavky HTTP, analýzu JSON a zpracování chyb, což zpřístupňuje externí zdroje dat prostřednictvím standardních dotazů SQL.
Příklad: Porovnání IP adres s CIDR síťovými bloky
Tento příklad ukazuje odpovídající IP adresy vůči síťovým blokům CIDR, což je běžná úloha přípravy dat, která vyžaduje složitou logiku SQL.
Nejprve vytvořte ukázková data s adresami IPv4 i IPv6:
-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
('log1', '192.168.1.100'),
('log2', '10.0.0.5'),
('log3', '172.16.0.10'),
('log4', '8.8.8.8'),
('log5', '2001:db8::1'),
('log6', '2001:db8:85a3::8a2e:370:7334'),
('log7', 'fe80::1'),
('log8', '::1'),
('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);
Dále definujte a zaregistrujte UDTF. Všimněte si struktury tříd Pythonu:
- Parametr
t TABLEpřijímá vstupní tabulku s libovolným schématem. UDTF se automaticky přizpůsobí zpracování libovolných sloupců. Tato flexibilita znamená, že můžete použít stejnou funkci napříč různými tabulkami beze změny podpisu funkce. Je však nutné pečlivě zkontrolovat schéma řádků, abyste zajistili kompatibilitu. - Metoda
__init__se používá pro náročné jednorázové nastavení, jako je načtení velkého seznamu sítí. Tato práce probíhá jednou pro každý oddíl vstupní tabulky. - Metoda
evalzpracovává každý řádek a obsahuje základní odpovídající logiku. Tato metoda se provede přesně jednou pro každý řádek ve vstupním oddílu a každé spuštění je provedeno odpovídající instancí třídy UDTF pro tento oddíl. - Klauzule
HANDLERurčuje název třídy Python, která implementuje logiku UDTF.
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
def __init__(self):
import ipaddress
# Heavy initialization - load networks once per partition
self.nets = []
cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
'2001:db8::/32', 'fe80::/10', '::1/128']
for cidr in cidrs:
self.nets.append(ipaddress.ip_network(cidr))
def eval(self, row):
import ipaddress
# Validate that required fields exist
required_fields = ['log_id', 'ip_address']
for field in required_fields:
if field not in row:
raise ValueError(f"Missing required field: {field}")
try:
ip = ipaddress.ip_address(row['ip_address'])
for net in self.nets:
if ip in net:
yield (row['log_id'], row['ip_address'], str(net), ip.version)
return
yield (row['log_id'], row['ip_address'], None, ip.version)
except ValueError:
yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;
Nyní, když je ip_cidr_matcher registrován v katalogu Unity, můžete jej zavolat přímo ze SQL pomocí syntaxe TABLE().
-- Process all IP addresses
SELECT
*
FROM
ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
log_id;
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address | network | ip_version |
+--------+-------------------------------+-----------------+-------------+
| log1 | 192.168.1.100 | 192.168.0.0/16 | 4 |
| log2 | 10.0.0.5 | 10.0.0.0/8 | 4 |
| log3 | 172.16.0.10 | 172.16.0.0/12 | 4 |
| log4 | 8.8.8.8 | null | 4 |
| log5 | 2001:db8::1 | 2001:db8::/32 | 6 |
| log6 | 2001:db8:85a3::8a2e:370:7334 | 2001:db8::/32 | 6 |
| log7 | fe80::1 | fe80::/10 | 6 |
| log8 | ::1 | ::1/128 | 6 |
| log9 | 2001:db8:1234:5678::1 | 2001:db8::/32 | 6 |
+--------+-------------------------------+-----------------+-------------+
Příklad: Dávkové popisování obrázků pomocí koncových bodů zpracování obrazu Azure Databricks
Tento příklad ukazuje dávkové titulkování obrázků pomocí modelu zpracování obrazu Azure Databricks obsluhující koncový bod. Předvádí použití terminate() pro dávkové zpracování a spouštění na základě oddílů.
Vytvořte tabulku s adresami URL veřejných imagí:
CREATE OR REPLACE TEMPORARY VIEW sample_images AS VALUES ('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery') images(image_url, category);Vytvořte Python UDTF pro Unity Catalog k vytváření popisů obrázků:
- Inicializujte UDTF s konfigurací, včetně velikosti dávky, tokenu Azure Databricks API, koncového bodu modelu rozpoznávání obrazu a URL pracovního prostoru.
-
evalV metodě shromážděte adresy URL obrázků do vyrovnávací paměti. Když vyrovnávací paměť dosáhne velikosti dávky, aktivujte dávkové zpracování. Tím se zajistí, že se více obrázků zpracuje společně v jednom volání rozhraní API, nikoli v jednotlivých voláních na obrázek. - V metodě dávkového zpracování stáhněte všechny obrázky uložené v mezipaměti, zakódujte je jako base64 a odešlete je jako jeden požadavek API do modelu Databricks VisionModel. Model zpracovává všechny obrázky současně a vrací titulky pro celou dávku.
- Metoda
terminatese provádí přesně jednou na konci každého segmentu. V metodě ukončení zpracujte všechny zbývající obrázky ve vyrovnávací paměti a vyvolte všechny shromážděné titulky jako výsledky.
Poznámka:
Nahraďte <workspace-url> skutečnou adresou URL pracovního prostoru Azure Databricks (https://your-workspace.cloud.databricks.com).
CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
def __init__(self):
self.batch_size = 3
self.vision_endpoint = "databricks-claude-sonnet-4-5"
self.workspace_url = "<workspace-url>"
self.image_buffer = []
self.results = []
def eval(self, row, api_token):
self.image_buffer.append((str(row[0]), api_token))
if len(self.image_buffer) >= self.batch_size:
self._process_batch()
def terminate(self):
if self.image_buffer:
self._process_batch()
for caption in self.results:
yield (caption,)
def _process_batch(self):
batch_data = self.image_buffer.copy()
self.image_buffer.clear()
import base64
import httpx
import requests
# API request timeout in seconds
api_timeout = 60
# Maximum tokens for vision model response
max_response_tokens = 300
# Temperature controls randomness (lower = more deterministic)
model_temperature = 0.3
# create a batch for the images
batch_images = []
api_token = batch_data[0][1] if batch_data else None
for image_url, _ in batch_data:
image_response = httpx.get(image_url, timeout=15)
image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
batch_images.append(image_data)
content_items = [{
"type": "text",
"text": "Provide brief captions for these images, one per line."
}]
for img_data in batch_images:
content_items.append({
"type": "image_url",
"image_url": {
"url": "data:image/jpeg;base64," + img_data
}
})
payload = {
"messages": [{
"role": "user",
"content": content_items
}],
"max_tokens": max_response_tokens,
"temperature": model_temperature
}
response = requests.post(
self.workspace_url + "/serving-endpoints/" +
self.vision_endpoint + "/invocations",
headers={
'Authorization': 'Bearer ' + api_token,
'Content-Type': 'application/json'
},
json=payload,
timeout=api_timeout
)
result = response.json()
batch_response = result['choices'][0]['message']['content'].strip()
lines = batch_response.split('\n')
captions = [line.strip() for line in lines if line.strip()]
while len(captions) < len(batch_data):
captions.append(batch_response)
self.results.extend(captions[:len(batch_data)])
$$;
Pokud chcete použít hromadné generování popisků obrázků pomocí UDTF, vyvolejte jej pomocí tabulky ukázkových obrázků.
Poznámka:
Nahraďte your_secret_scope a api_token skutečným rozsahem tajného úložiště a názvem klíče pro token rozhraní API Databricks.
SELECT
caption
FROM
batch_inference_image_caption(
data => TABLE(sample_images),
api_token => secret('your_secret_scope', 'api_token')
)
+---------------------------------------------------------------------------------------------------------------+
| caption |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies |
| Black ant in detailed macro photography standing on a textured surface |
| Tabby cat lounging comfortably on a white ledge against a white wall |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+
Kategorii titulků obrázků můžete také vygenerovat podle kategorie:
SELECT
*
FROM
batch_inference_image_caption(
TABLE(sample_images)
PARTITION BY category ORDER BY (category),
secret('your_secret_scope', 'api_token')
)
+------------------------------------------------------------------------------------------------------+
| caption |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space. |
| Tabby cat lounging comfortably on white ledge against white wall |
| Wooden boardwalk cutting through lush wetland grasses under blue skies |
+------------------------------------------------------------------------------------------------------+
Příklad: Křivka ROC a výpočet AUC pro vyhodnocení modelu ML
Tento příklad ukazuje, jak vypočítat křivky ROC (receiver operating characteristic) a skóre oblasti pod křivkou (AUC) pro vyhodnocení binárního klasifikačního modelu pomocí knihovny scikit-learn.
Tento příklad ukazuje několik důležitých vzorů:
- Použití externí knihovny: Integrace scikit-learn pro výpočet křivky ROC
- Stavová agregace: Shromažďuje předpovědi napříč všemi řádky před výpočetními metrikami.
-
terminate()použití metody: Zpracuje úplnou datovou sadu a vrátí výsledky až po vyhodnocení všech řádků. - Zpracování chyb: Ověřuje, jestli ve vstupní tabulce existují požadované sloupce.
UDTF shromáždí všechny předpovědi v paměti a poté pomocí metody eval() vypočítá a vydá úplnou křivku ROC pomocí metody terminate(). Tento model je užitečný pro metriky, které vyžadují úplnou datovou sadu pro výpočet.
CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
def __init__(self):
from sklearn import metrics
self._roc_curve = metrics.roc_curve
self._roc_auc_score = metrics.roc_auc_score
self._true_labels = []
self._predicted_scores = []
def eval(self, row):
if 'y_true' not in row or 'y_score' not in row:
raise KeyError("Required columns 'y_true' and 'y_score' not found")
true_label = row['y_true']
predicted_score = row['y_score']
label = float(true_label)
self._true_labels.append(label)
self._predicted_scores.append(float(predicted_score))
def terminate(self):
false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
self._true_labels,
self._predicted_scores,
drop_intermediate=False
)
auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))
for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
yield float(threshold), float(tpr), float(fpr), auc_score
$$;
Vytvoření ukázkových binárních klasifikačních dat pomocí předpovědí:
CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
( 1, 1.0, 0.95, 'high_confidence_positive'),
( 2, 1.0, 0.87, 'high_confidence_positive'),
( 3, 1.0, 0.82, 'medium_confidence_positive'),
( 4, 0.0, 0.78, 'false_positive'),
( 5, 1.0, 0.71, 'medium_confidence_positive'),
( 6, 0.0, 0.65, 'false_positive'),
( 7, 0.0, 0.58, 'true_negative'),
( 8, 1.0, 0.52, 'low_confidence_positive'),
( 9, 0.0, 0.45, 'true_negative'),
(10, 0.0, 0.38, 'true_negative'),
(11, 1.0, 0.31, 'low_confidence_positive'),
(12, 0.0, 0.15, 'true_negative'),
(13, 0.0, 0.08, 'high_confidence_negative'),
(14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);
Výpočet křivky ROC a AUC:
SELECT
threshold,
true_positive_rate,
false_positive_rate,
auc
FROM compute_roc_curve(
TABLE(
SELECT y_true, y_score
FROM binary_classification_data
WHERE y_true IS NOT NULL AND y_score IS NOT NULL
ORDER BY sample_id
)
)
ORDER BY threshold DESC;
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate | false_positive_rate | auc |
+-----------+---------------------+----------------------+-------+
| 1.95 | 0.0 | 0.0 | 0.786 |
| 0.95 | 0.167 | 0.0 | 0.786 |
| 0.87 | 0.333 | 0.0 | 0.786 |
| 0.82 | 0.5 | 0.0 | 0.786 |
| 0.78 | 0.5 | 0.125 | 0.786 |
| 0.71 | 0.667 | 0.125 | 0.786 |
| 0.65 | 0.667 | 0.25 | 0.786 |
| 0.58 | 0.667 | 0.375 | 0.786 |
| 0.52 | 0.833 | 0.375 | 0.786 |
| 0.45 | 0.833 | 0.5 | 0.786 |
| 0.38 | 0.833 | 0.625 | 0.786 |
| 0.31 | 1.0 | 0.625 | 0.786 |
| 0.15 | 1.0 | 0.75 | 0.786 |
| 0.08 | 1.0 | 0.875 | 0.786 |
| 0.03 | 1.0 | 1.0 | 0.786 |
+-----------+---------------------+----------------------+-------+
Příklad: Dynamické projekce sloupců z argumentu tabulky
Tento příklad kombinuje polymorfní UDTF s argumenty tabulky. UDTF přijímá tabulku a čárkami oddělený seznam názvů sloupců a pak promítá ze vstupu pouze tyto sloupce. Metoda analyze zkontroluje schéma vstupní tabulky a sestaví výstupní schéma obsahující pouze požadované sloupce.
CREATE OR REPLACE FUNCTION project_columns(t TABLE, columns STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ProjectColumns'
AS $$
class ProjectColumns:
@staticmethod
def analyze(t, columns):
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeResult
requested = [c.strip() for c in columns.value.split(",")]
input_schema = t.dataType
output_fields = []
for field in input_schema.fields:
if field.name in requested:
output_fields.append(field)
if not output_fields:
raise ValueError(
f"None of the requested columns {requested} "
f"exist in the input table"
)
return AnalyzeResult(schema=StructType(output_fields))
def eval(self, row, columns: str):
requested = [c.strip() for c in columns.split(",")]
yield tuple(row[col] for col in requested if col in row)
$$;
Pomocí funkce můžete vybrat konkrétní sloupce z tabulky:
SELECT * FROM project_columns(
TABLE(SELECT * FROM samples.nyctaxi.trips LIMIT 5),
'pickup_zip, dropoff_zip, fare_amount'
);
Omezení
Následující omezení platí pro Python UDTFs v katalogu Unity: