Nota
L'accés a aquesta pàgina requereix autorització. Podeu provar d'iniciar la sessió o de canviar els directoris.
L'accés a aquesta pàgina requereix autorització. Podeu provar de canviar els directoris.
Importante
El registro de UDTF de Python en el catálogo de Unity está en versión preliminar pública.
Una función de tabla definida por el usuario (UDTF) de Catálogo de Unity registra funciones que devuelven tablas completas en lugar de valores escalares. A diferencia de las funciones escalares que devuelven un único valor de resultado de cada llamada, las UDF se invocan en la cláusula de FROM una instrucción SQL y pueden devolver varias filas y columnas.
Las UDTF son especialmente útiles para:
- Transformación de matrices o estructuras de datos complejas en varias filas
- Integración de API o servicios externos en flujos de trabajo de SQL
- Implementación de la lógica de generación o enriquecimiento de datos personalizados
- Procesamiento de datos que requieren operaciones con estado entre filas
Cada llamada UDTF acepta cero o más argumentos. Estos argumentos pueden ser expresiones escalares o argumentos de tabla que representan tablas de entrada completas.
Las UDTF se pueden registrar de dos maneras:
- Catálogo de Unity: registre el UDTF como un objeto regulado en el catálogo de Unity.
- Ámbito de sesión: Registre en el entorno local
SparkSession, aislado en el notebook o tarea actual. Consulte Funciones de tabla definidas por el usuario (UDF) de Python.
Requisitos
Las UDF de Python del catálogo de Unity se admiten en los siguientes tipos de proceso:
- Cuadernos y trabajos sin servidor
- Proceso clásico con modo de acceso estándar (Databricks Runtime 17.1 y versiones posteriores)
- SQL Warehouse (sin servidor o profesional)
Creación de un UDTF en el catálogo de Unity
Use SQL DDL para crear un UDTF regulado en el catálogo de Unity. Las UDTF se invocan mediante la cláusula FROM de una instrucción SQL.
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;
SELECT * FROM square_numbers(1, 5);
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+
Azure Databricks implementa UDTF de Python como clases de Python con un método obligatorio eval que genera filas de salida.
Argumentos de tabla
Nota:
TABLE Los argumentos se admiten en Databricks Runtime 17.2 y versiones posteriores.
Las UDF pueden aceptar tablas completas como argumentos de entrada, lo que permite transformaciones y agregaciones con estado complejos.
eval() y terminate() métodos de ciclo de vida
Los argumentos de tabla de las UDF usan las funciones siguientes para procesar cada fila:
-
eval(): se llama una vez para cada fila de la tabla de entrada. Este es el método de procesamiento principal y es necesario. -
terminate(): se llama una vez al final de cada partición, después de que todas las filas se hayan procesado medianteeval(). Use este método para generar resultados agregados finales o realizar operaciones de limpieza. Este método es opcional, pero esencial para las operaciones con estado, como agregaciones, recuento o procesamiento por lotes.
Para más información sobre los métodos
Patrones de acceso a filas
eval() recibe filas de los argumentos TABLE como objetos pyspark.sql.Row. Puede tener acceso a los valores por nombre de columna (row['id'], row['name']) o por índice (row[0], row[1]).
-
Flexibilidad de esquema: declare TABLE argumentos sin definiciones de esquema (por ejemplo,
data TABLE,t TABLE). La función acepta cualquier estructura de tabla, por lo que el código debe validar que existen columnas necesarias.
Consulte Ejemplo: Comparación de direcciones IP con bloques de red CIDR y Ejemplo: creación de subtítulos de imágenes por lotes usando puntos de conexión de Vision de Azure Databricks.
Calcular un esquema de salida dinámico (UDF polimórficos)
Nota:
Las UDTF de UC polimórficas requieren Databricks Runtime 18.1 y versiones posteriores.
Un UDTF polimórfico determina su esquema de salida dinámicamente en el momento de la consulta mediante un método estático analyze() , en lugar de declarar las columnas por adelantado. Para crear una, use RETURNS TABLE sin definiciones de columna y defina un analyze() método en la clase de controlador.
En el ejemplo siguiente se extraen los campos especificados por el autor de la llamada de una cadena JSON y se devuelven columnas diferentes en función del fields argumento :
CREATE OR REPLACE FUNCTION extract_fields(json_str STRING, fields STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ExtractFields'
AS $$
class ExtractFields:
@staticmethod
def analyze(json_str, fields):
# Build the output schema from the requested field names
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.udtf import AnalyzeResult
col_names = [f.strip() for f in fields.value.split(",")]
return AnalyzeResult(
StructType([StructField(name, StringType()) for name in col_names])
)
def eval(self, json_str: str, fields: str):
# Parse the JSON and yield only the requested fields
import json
data = json.loads(json_str)
col_names = [f.strip() for f in fields.split(",")]
yield tuple(data.get(name) for name in col_names)
$$;
-- Extract the name and city
SELECT * FROM extract_fields(
'{"name": "Alice", "age": 30, "city": "Seattle"}',
'name, city'
);
+-------+---------+
| name | city |
+-------+---------+
| Alice | Seattle |
+-------+---------+
Definición del analyze método
La clase de controlador debe incluir un @staticmethod método denominado analyze que acepta los mismos argumentos que el UDTF y devuelve un AnalyzeResult que describe el esquema de salida. Azure Databricks llama a analyze() en tiempo de planificación de consultas para resolver el esquema antes de ejecutar la función.
Cada parámetro de analyze es una instancia de la AnalyzeArgument clase :
| Campo | Descripción |
|---|---|
dataType |
Tipo del argumento de entrada como DataType. En el caso de los argumentos de la tabla de entrada, se trata de un objeto StructType que representa las columnas de la tabla. |
value |
El valor del argumento de entrada como un Optional[Any]. Esto es None para argumentos de tabla o expresiones no constantes. |
isTable |
Si el argumento de entrada es un argumento de tabla como .BooleanType |
isConstantExpression |
Si el argumento de entrada es una expresión plegable constante como .BooleanType |
El analyze método devuelve una instancia de la AnalyzeResult clase :
| Campo | Descripción |
|---|---|
schema |
Esquema de la tabla de resultados como .StructType |
withSinglePartition |
Si True envía todas las filas de entrada a la misma instancia de clase UDTF. |
partitionBy |
Si no está vacío, las particiones escriben filas por las expresiones especificadas para que cada combinación única se procese mediante una instancia UDTF independiente. |
orderBy |
Si no está vacío, especifica una ordenación de filas dentro de cada partición. |
select |
Si no está vacío, especifica qué columnas del argumento de entrada TABLE recibe el UDTF. |
Advertencia
Para las UDTF polimórficas del Unity Catalog, debe colocar todas las importaciones dentro del cuerpo del método analyze(). Las importaciones de nivel superior no están disponibles en el entorno aislado del catálogo de Unity.
El estado se transfiere de analyze a eval
El método analyze se ejecuta una vez en el tiempo de planificación de consultas, por lo que puede usarlo para procesar previamente argumentos constantes, realizar análisis de configuraciones o construir consultas. Para reenviar esos resultados a eval, cree una @dataclass subclase de AnalyzeResult con campos personalizados, devuélvalo de analyze y acéptelo en el método __init__. Esto evita repetir el trabajo costoso para cada fila.
En el ejemplo siguiente se resuelve un código de idioma en un nombre de idioma completo una vez en analyze y lo reenvía, por lo que eval puede etiquetar cada fila sin repetir la búsqueda:
CREATE OR REPLACE FUNCTION tag_language(t TABLE, lang_code STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'TagLanguage'
AS $$
class TagLanguage:
@staticmethod
def analyze(t, lang_code):
from dataclasses import dataclass
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.udtf import AnalyzeResult
@dataclass
class LangResult(AnalyzeResult):
language: str = ""
# Resolve the language code to a full name once during planning
languages = {"en": "English", "es": "Spanish", "fr": "French", "de": "German"}
return LangResult(
schema=StructType([
StructField("text", StringType()),
StructField("language", StringType())
]),
language=languages.get(lang_code.value, "Unknown")
)
def __init__(self, result):
self._language = result.language
def eval(self, row, lang_code: str):
# Tag each row with the pre-resolved language name
yield (row['text'], self._language)
$$;
SELECT * FROM tag_language(
TABLE(VALUES ('Hola mundo'), ('Buenos días') t(text)),
'es'
);
+-------------+----------+
| text | language |
+-------------+----------+
| Hola mundo | Spanish |
| Buenos días | Spanish |
+-------------+----------+
Para obtener más patrones y detalles sobre el estado de reenvío, consulte Reenvío de estado a llamadas futuraseval.
Especificar la creación de particiones desde el analyze método
Cuando un UDTF polimórfico acepta un argumento de tabla, el analyze método puede controlar cómo se distribuyen las filas de entrada entre instancias UDTF estableciendo partitionBy, orderBywithSinglePartition, y select en AnalyzeResult. Esto elimina la necesidad de que los autores de llamadas especifiquen PARTITION BY o ORDER BY en SQL.
Para obtener la API de particionamiento completa y ejemplos, vea Especificar un particionamiento de las filas de entrada desde el método analyze.
Aislamiento del entorno
Nota:
Los entornos de aislamiento compartido requieren Databricks Runtime 17.2 y versiones posteriores. En versiones anteriores, todas las UDF de Python del catálogo de Unity se ejecutan en modo de aislamiento estricto.
Las UDF de Python del catálogo de Unity con el mismo propietario y sesión pueden compartir un entorno de aislamiento de forma predeterminada. Esto mejora el rendimiento y reduce el uso de memoria al reducir el número de entornos independientes que deben iniciarse.
Aislamiento estricto
Para asegurarse de que un UDTF siempre se ejecute en su propio entorno totalmente aislado, agregue la cláusula característica STRICT ISOLATION.
La mayoría de las UDTF no necesitan aislamiento estricto. Las UDF de procesamiento de datos estándar se benefician del entorno de aislamiento compartido predeterminado y se ejecutan más rápido con un menor consumo de memoria.
Agregue la cláusula característica STRICT ISOLATION a las UDTFs que:
- Ejecute la entrada como código mediante
eval(),exec()o funciones similares. - Escribir archivos en el sistema de archivos local.
- Modifique las variables globales o el estado del sistema.
- Acceso o modificación de variables de entorno.
En el ejemplo UDTF siguiente se establece una variable de entorno personalizada, se lee la variable y se multiplica un conjunto de números mediante la variable . Dado que el UDTF muta el entorno de proceso, ejecútelo en STRICT ISOLATION. De lo contrario, podría filtrar o sobrescribir variables de entorno para otras UDF/UDTF en el mismo entorno, lo que podría provocar un comportamiento incorrecto.
CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os
class Multiplier:
def eval(self, factor: str):
# Save the factor as an environment variable
os.environ["FACTOR"] = factor
# Read it back and convert it to a number
scale = int(os.getenv("FACTOR", "1"))
# Multiply 0 through 4 by the factor
for i in range(5):
yield (i, i * scale)
$$;
SELECT * FROM multiply_numbers("3");
Establecer DETERMINISTIC si la función genera resultados coherentes
Agregue DETERMINISTIC a la definición de función si genera las mismas salidas para las mismas entradas. Esto permite que las optimizaciones de consultas mejoren el rendimiento.
De forma predeterminada, se supone que las UDF de Python del catálogo de Batch Unity no son deterministas a menos que se declaren explícitamente. Entre los ejemplos de funciones no deterministas se incluyen: generar valores aleatorios, acceder a las horas o fechas actuales o realizar llamadas API externas.
Consulte CREATE FUNCTION (SQL y Python).
Ejemplos prácticos
En los ejemplos siguientes se muestran casos de uso reales para UDF de Python del catálogo de Unity, que avanzan desde transformaciones de datos simples a integraciones externas complejas.
Ejemplo: Volver a implementar explode
Aunque Spark proporciona una función integrada explode , la creación de su propia versión muestra el patrón UDTF fundamental de tomar una sola entrada y generar varias filas de salida.
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;
Use la función directamente en una consulta SQL:
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
| element |
+---------+
| apple |
| banana |
| cherry |
+---------+
O aplíquelos a los datos de tabla existentes con una LATERAL combinación:
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;
Ejemplo: geolocalización de direcciones IP a través de la API REST
En este ejemplo se muestra cómo las UDF pueden integrar API externas directamente en el flujo de trabajo de SQL. Los analistas pueden enriquecer los datos con llamadas API en tiempo real mediante la sintaxis SQL conocida, sin necesidad de procesos ETL independientes.
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Return nothing if the API request fails
return
$$;
Nota:
Las UDF de Python permiten el tráfico de red TCP/UDP a través de los puertos 80, 443 y 53 cuando se usa el proceso sin servidor o el proceso configurado con el modo de acceso estándar.
Use la función para enriquecer los datos de registro web con información geográfica:
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;
Este enfoque permite el análisis geográfico en tiempo real sin necesidad de tablas de búsqueda preprocesados ni canalizaciones de datos independientes. El UDTF controla las solicitudes HTTP, el análisis de JSON y el control de errores, lo que hace que los orígenes de datos externos sean accesibles a través de consultas SQL estándar.
Ejemplo: Coincidencia de direcciones IP con bloques de red CIDR
En este ejemplo se muestran las direcciones IP coincidentes con los bloques de red CIDR, una tarea de ingeniería de datos común que requiere una lógica SQL compleja.
En primer lugar, cree datos de ejemplo con direcciones IPv4 e IPv6:
-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
('log1', '192.168.1.100'),
('log2', '10.0.0.5'),
('log3', '172.16.0.10'),
('log4', '8.8.8.8'),
('log5', '2001:db8::1'),
('log6', '2001:db8:85a3::8a2e:370:7334'),
('log7', 'fe80::1'),
('log8', '::1'),
('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);
A continuación, defina y registre el UDTF. Observe la estructura de clases de Python:
- El
t TABLEparámetro acepta una tabla de entrada con cualquier esquema. El UDTF se adapta automáticamente para procesar las columnas que se proporcionan. Esta flexibilidad significa que puede usar la misma función en distintas tablas sin modificar la firma de la función. Sin embargo, debe comprobar cuidadosamente el esquema de las filas para garantizar la compatibilidad. - El método
__init__se utiliza para una configuración intensiva de una sola vez, como cargar la gran lista de redes. Este trabajo se realiza una vez por partición de la tabla de entrada. - El
evalmétodo procesa cada fila y contiene la lógica de coincidencia principal. Este método se ejecuta exactamente una vez para cada fila de la partición de entrada y cada ejecución se realiza mediante la instancia correspondiente de laIpMatcherclase UDTF para esa partición. - La
HANDLERcláusula especifica el nombre de la clase de Python que implementa la lógica UDTF.
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
def __init__(self):
import ipaddress
# Heavy initialization - load networks once per partition
self.nets = []
cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
'2001:db8::/32', 'fe80::/10', '::1/128']
for cidr in cidrs:
self.nets.append(ipaddress.ip_network(cidr))
def eval(self, row):
import ipaddress
# Validate that required fields exist
required_fields = ['log_id', 'ip_address']
for field in required_fields:
if field not in row:
raise ValueError(f"Missing required field: {field}")
try:
ip = ipaddress.ip_address(row['ip_address'])
for net in self.nets:
if ip in net:
yield (row['log_id'], row['ip_address'], str(net), ip.version)
return
yield (row['log_id'], row['ip_address'], None, ip.version)
except ValueError:
yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;
Ahora que ip_cidr_matcher está registrado en el catálogo de Unity, llámelo directamente desde SQL mediante la TABLE() sintaxis :
-- Process all IP addresses
SELECT
*
FROM
ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
log_id;
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address | network | ip_version |
+--------+-------------------------------+-----------------+-------------+
| log1 | 192.168.1.100 | 192.168.0.0/16 | 4 |
| log2 | 10.0.0.5 | 10.0.0.0/8 | 4 |
| log3 | 172.16.0.10 | 172.16.0.0/12 | 4 |
| log4 | 8.8.8.8 | null | 4 |
| log5 | 2001:db8::1 | 2001:db8::/32 | 6 |
| log6 | 2001:db8:85a3::8a2e:370:7334 | 2001:db8::/32 | 6 |
| log7 | fe80::1 | fe80::/10 | 6 |
| log8 | ::1 | ::1/128 | 6 |
| log9 | 2001:db8:1234:5678::1 | 2001:db8::/32 | 6 |
+--------+-------------------------------+-----------------+-------------+
Ejemplo: generación de textos descriptivos de imágenes por lotes mediante endpoints de visión de Azure Databricks
En este ejemplo se demuestra la subtitulación de imágenes por lotes utilizando un modelo de visión de Azure Databricks sirviendo un punto de conexión. Muestra el uso de terminate() para el procesamiento por lotes y la ejecución basada en particiones.
Cree una tabla con direcciones URL de imagen pública:
CREATE OR REPLACE TEMPORARY VIEW sample_images AS VALUES ('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'), ('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery') images(image_url, category);Cree un UDTF de Python en Unity Catalog para generar subtítulos de imagen.
- Inicialice el UDTF con la configuración, incluido el tamaño del lote, el token de API de Azure Databricks, el punto de conexión del modelo de visión y la dirección URL del área de trabajo.
- En el
evalmétodo , recopile las direcciones URL de la imagen en un búfer. Cuando el búfer alcanza el tamaño del lote, desencadene el procesamiento por lotes. Esto garantiza que varias imágenes se procesen juntas en una sola llamada API en lugar de llamadas individuales por imagen. - En el método de procesamiento por lotes, descargue todas las imágenes almacenadas en búfer, codifiquelas como base64 y envíelas a una única solicitud de API a Databricks VisionModel. El modelo procesa todas las imágenes simultáneamente y devuelve subtítulos para todo el lote.
- El
terminatemétodo se ejecuta exactamente una vez al final de cada partición. En el método terminate, procese las imágenes restantes en el búfer y genere todos los títulos recopilados como resultados.
Nota:
Reemplace por <workspace-url> la dirección URL real del área de trabajo de Azure Databricks (https://your-workspace.cloud.databricks.com).
CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
def __init__(self):
self.batch_size = 3
self.vision_endpoint = "databricks-claude-sonnet-4-5"
self.workspace_url = "<workspace-url>"
self.image_buffer = []
self.results = []
def eval(self, row, api_token):
self.image_buffer.append((str(row[0]), api_token))
if len(self.image_buffer) >= self.batch_size:
self._process_batch()
def terminate(self):
if self.image_buffer:
self._process_batch()
for caption in self.results:
yield (caption,)
def _process_batch(self):
batch_data = self.image_buffer.copy()
self.image_buffer.clear()
import base64
import httpx
import requests
# API request timeout in seconds
api_timeout = 60
# Maximum tokens for vision model response
max_response_tokens = 300
# Temperature controls randomness (lower = more deterministic)
model_temperature = 0.3
# create a batch for the images
batch_images = []
api_token = batch_data[0][1] if batch_data else None
for image_url, _ in batch_data:
image_response = httpx.get(image_url, timeout=15)
image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
batch_images.append(image_data)
content_items = [{
"type": "text",
"text": "Provide brief captions for these images, one per line."
}]
for img_data in batch_images:
content_items.append({
"type": "image_url",
"image_url": {
"url": "data:image/jpeg;base64," + img_data
}
})
payload = {
"messages": [{
"role": "user",
"content": content_items
}],
"max_tokens": max_response_tokens,
"temperature": model_temperature
}
response = requests.post(
self.workspace_url + "/serving-endpoints/" +
self.vision_endpoint + "/invocations",
headers={
'Authorization': 'Bearer ' + api_token,
'Content-Type': 'application/json'
},
json=payload,
timeout=api_timeout
)
result = response.json()
batch_response = result['choices'][0]['message']['content'].strip()
lines = batch_response.split('\n')
captions = [line.strip() for line in lines if line.strip()]
while len(captions) < len(batch_data):
captions.append(batch_response)
self.results.extend(captions[:len(batch_data)])
$$;
Para usar el título de imagen por lotes UDTF, llámelo mediante la tabla de imágenes de ejemplo:
Nota:
Reemplaza your_secret_scope y api_token por el ámbito y el nombre de la clave reales del token de API de Databricks.
SELECT
caption
FROM
batch_inference_image_caption(
data => TABLE(sample_images),
api_token => secret('your_secret_scope', 'api_token')
)
+---------------------------------------------------------------------------------------------------------------+
| caption |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies |
| Black ant in detailed macro photography standing on a textured surface |
| Tabby cat lounging comfortably on a white ledge against a white wall |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+
También puede generar categorías de títulos de imagen por categoría:
SELECT
*
FROM
batch_inference_image_caption(
TABLE(sample_images)
PARTITION BY category ORDER BY (category),
secret('your_secret_scope', 'api_token')
)
+------------------------------------------------------------------------------------------------------+
| caption |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space. |
| Tabby cat lounging comfortably on white ledge against white wall |
| Wooden boardwalk cutting through lush wetland grasses under blue skies |
+------------------------------------------------------------------------------------------------------+
Ejemplo: curva ROC y cálculo de AUC para la evaluación del modelo de ML
En este ejemplo se demuestran las curvas de la característica operativa del receptor (ROC) y las puntuaciones de área bajo la curva (AUC) para la evaluación de modelos de clasificación binaria utilizando scikit-learn.
En este ejemplo se muestran varios patrones importantes:
- Uso de la biblioteca externa: integra scikit-learn para el cálculo de curva ROC
- Agregación con estado: Acumula predicciones a lo largo de todas las filas antes de calcular las métricas.
-
terminate()uso del método: procesa el conjunto de datos completo y produce resultados solo después de que se hayan evaluado todas las filas. - Control de errores: valida que existen columnas necesarias en la tabla de entrada.
El UDTF acumula todas las predicciones en la memoria mediante el eval() método y, a continuación, calcula y produce la curva ROC completa en el terminate() método . Este patrón es útil para las métricas que requieren el conjunto de datos completo para el cálculo.
CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
def __init__(self):
from sklearn import metrics
self._roc_curve = metrics.roc_curve
self._roc_auc_score = metrics.roc_auc_score
self._true_labels = []
self._predicted_scores = []
def eval(self, row):
if 'y_true' not in row or 'y_score' not in row:
raise KeyError("Required columns 'y_true' and 'y_score' not found")
true_label = row['y_true']
predicted_score = row['y_score']
label = float(true_label)
self._true_labels.append(label)
self._predicted_scores.append(float(predicted_score))
def terminate(self):
false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
self._true_labels,
self._predicted_scores,
drop_intermediate=False
)
auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))
for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
yield float(threshold), float(tpr), float(fpr), auc_score
$$;
Cree datos de clasificación binaria de ejemplo con predicciones:
CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
( 1, 1.0, 0.95, 'high_confidence_positive'),
( 2, 1.0, 0.87, 'high_confidence_positive'),
( 3, 1.0, 0.82, 'medium_confidence_positive'),
( 4, 0.0, 0.78, 'false_positive'),
( 5, 1.0, 0.71, 'medium_confidence_positive'),
( 6, 0.0, 0.65, 'false_positive'),
( 7, 0.0, 0.58, 'true_negative'),
( 8, 1.0, 0.52, 'low_confidence_positive'),
( 9, 0.0, 0.45, 'true_negative'),
(10, 0.0, 0.38, 'true_negative'),
(11, 1.0, 0.31, 'low_confidence_positive'),
(12, 0.0, 0.15, 'true_negative'),
(13, 0.0, 0.08, 'high_confidence_negative'),
(14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);
Calcule la curva ROC y AUC:
SELECT
threshold,
true_positive_rate,
false_positive_rate,
auc
FROM compute_roc_curve(
TABLE(
SELECT y_true, y_score
FROM binary_classification_data
WHERE y_true IS NOT NULL AND y_score IS NOT NULL
ORDER BY sample_id
)
)
ORDER BY threshold DESC;
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate | false_positive_rate | auc |
+-----------+---------------------+----------------------+-------+
| 1.95 | 0.0 | 0.0 | 0.786 |
| 0.95 | 0.167 | 0.0 | 0.786 |
| 0.87 | 0.333 | 0.0 | 0.786 |
| 0.82 | 0.5 | 0.0 | 0.786 |
| 0.78 | 0.5 | 0.125 | 0.786 |
| 0.71 | 0.667 | 0.125 | 0.786 |
| 0.65 | 0.667 | 0.25 | 0.786 |
| 0.58 | 0.667 | 0.375 | 0.786 |
| 0.52 | 0.833 | 0.375 | 0.786 |
| 0.45 | 0.833 | 0.5 | 0.786 |
| 0.38 | 0.833 | 0.625 | 0.786 |
| 0.31 | 1.0 | 0.625 | 0.786 |
| 0.15 | 1.0 | 0.75 | 0.786 |
| 0.08 | 1.0 | 0.875 | 0.786 |
| 0.03 | 1.0 | 1.0 | 0.786 |
+-----------+---------------------+----------------------+-------+
Ejemplo: Proyección de columna dinámica a partir de un argumento de tabla
En este ejemplo se combinan UDF polimórficas con argumentos de tabla. El UDTF acepta una tabla y una lista separada por comas de nombres de columna y, a continuación, proyecta solo esas columnas de la entrada. El analyze método inspecciona el esquema de la tabla de entrada y crea un esquema de salida que contiene solo las columnas solicitadas.
CREATE OR REPLACE FUNCTION project_columns(t TABLE, columns STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ProjectColumns'
AS $$
class ProjectColumns:
@staticmethod
def analyze(t, columns):
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeResult
requested = [c.strip() for c in columns.value.split(",")]
input_schema = t.dataType
output_fields = []
for field in input_schema.fields:
if field.name in requested:
output_fields.append(field)
if not output_fields:
raise ValueError(
f"None of the requested columns {requested} "
f"exist in the input table"
)
return AnalyzeResult(schema=StructType(output_fields))
def eval(self, row, columns: str):
requested = [c.strip() for c in columns.split(",")]
yield tuple(row[col] for col in requested if col in row)
$$;
Use la función para seleccionar columnas específicas de una tabla:
SELECT * FROM project_columns(
TABLE(SELECT * FROM samples.nyctaxi.trips LIMIT 5),
'pickup_zip, dropoff_zip, fare_amount'
);
Limitaciones
Las limitaciones siguientes se aplican a las UDF de Python del catálogo de Unity:
- No se admiten las credenciales del servicio catálogo de Unity.
- No se admiten las dependencias personalizadas.