Funzioni di tabella definite dall'utente Python (UDTF) nel Unity Catalog

Importante

La registrazione delle UDTFs (funzioni definite dall'utente Python) in Unity Catalog è disponibile in anteprima pubblica.

Una funzione di tabella definita dall'utente (UDTF) di Unity Catalog registra funzioni che restituiscono tabelle complete anziché valori scalari. A differenza delle funzioni scalari che restituiscono un singolo valore di risultato da ogni chiamata, le funzioni definite dall'utente per tabelle (UDTF) vengono richiamate nella clausola FROM di un'istruzione SQL e possono restituire più righe e più colonne.

Le funzioni definite dall'utente sono particolarmente utili per:

  • Trasformazione di matrici o strutture di dati complesse in più righe
  • Integrazione di API o servizi esterni nei flussi di lavoro SQL
  • Implementazione della logica di generazione o arricchimento di dati personalizzati
  • Elaborazione di dati che richiedono operazioni con stato tra righe

Ogni chiamata UDTF accetta zero o più argomenti. Questi argomenti possono essere espressioni scalari o argomenti di tabella che rappresentano intere tabelle di input.

Le UDTF possono essere registrate in due modi:

  • Catalogo unity: registrare il tipo definito dall'utente come oggetto regolamentato nel catalogo unity.
  • Ambito della sessione: Registrarsi alla memoria locale SparkSession, isolata nel notebook o nel processo corrente. Vedere Funzioni di tabella definite dall'utente python (UDF) .

Requisiti

Le UDTF di Python del Unity Catalog sono supportate nei seguenti tipi di calcolo:

  • Serverless notebook e attività
  • Calcolo classico con modalità di accesso standard (Databricks Runtime 17.1 e versioni successive)
  • magazzino SQL (senza server o pro)

Creare una UDTF nel Unity Catalog

Usare SQL DDL per creare un UDTF gestito in Unity Catalog. Le UDTF vengono richiamate usando la clausola di FROM un'istruzione SQL.

CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
    """
    Basic UDTF that computes a sequence of integers
    and includes the square of each number in the range.
    """
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num * num)
$$;

SELECT * FROM square_numbers(1, 5);

+-----+---------+
| num | squared |
+-----+---------+
| 1   | 1       |
| 2   | 4       |
| 3   | 9       |
| 4   | 16      |
| 5   | 25      |
+-----+---------+

Azure Databricks implementa le funzioni definite dall'utente Python come classi Python con un metodo obbligatorio eval che restituisce righe di output.

Argomenti della tabella

Annotazioni

TABLE Gli argomenti sono supportati in Databricks Runtime 17.2 e versioni successive.

Le funzioni definite dall'utente per tabelle (UDTF) possono accettare intere tabelle come argomenti di input, consentendo trasformazioni e aggregazioni stateful complesse.

eval() e terminate() metodi relativi al ciclo di vita

Gli argomenti di tabella nelle funzioni definite dall'utente usano le funzioni seguenti per elaborare ogni riga:

  • eval(): chiamata una volta per ogni riga nella tabella di input. Questo è il metodo di elaborazione principale ed è obbligatorio.
  • terminate(): chiamato una volta alla fine di ogni partizione, dopo che tutte le righe sono state elaborate da eval(). Utilizzare questo metodo per produrre risultati aggregati finali o eseguire operazioni di pulizia. Questo metodo è facoltativo, ma essenziale per operazioni con stato come aggregazioni, conteggio o elaborazione batch.

Per altre informazioni sui eval() metodi e terminate() , vedere la documentazione di Apache Spark: Python UDTF.

Modelli di accesso alle righe

eval() riceve righe dai TABLE argomenti come oggetti pyspark.sql.Row . È possibile accedere ai valori in base al nome della colonna (row['id'], row['name']) o all'indice (row[0], row[1]).

  • Flessibilità dello schema: dichiara TABLE argomenti senza definizioni di schema (ad esempio, data TABLE, t TABLE). La funzione accetta qualsiasi struttura di tabella, quindi il codice deve convalidare l'esistenza delle colonne necessarie.

Vedere Esempio: Associare gli indirizzi IP ai blocchi di rete CIDR e Esempio: didascalia batch delle immagini utilizzando gli endpoint di visione di Azure Databricks.

Calcolare uno schema di output dinamico (UDF polimorfiche)

Annotazioni

Le UDTF polimorfiche UC richiedono Databricks Runtime 18.1 e versioni successive.

Un UDTF polimorfico determina lo schema di output in modo dinamico al momento della query usando un metodo statico analyze(), anziché dichiarare le colonne in anticipo. Per crearne uno, usare RETURNS TABLE senza definizioni di colonna e definire un analyze() metodo nella classe del gestore.

L'esempio seguente estrae i campi specificati dal chiamante da una stringa JSON, restituendo colonne diverse a seconda dell'argomento fields :

CREATE OR REPLACE FUNCTION extract_fields(json_str STRING, fields STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ExtractFields'
AS $$
class ExtractFields:
    @staticmethod
    def analyze(json_str, fields):

        # Build the output schema from the requested field names
        from pyspark.sql.types import StructType, StructField, StringType
        from pyspark.sql.udtf import AnalyzeResult
        col_names = [f.strip() for f in fields.value.split(",")]
        return AnalyzeResult(
            StructType([StructField(name, StringType()) for name in col_names])
        )

    def eval(self, json_str: str, fields: str):
        # Parse the JSON and yield only the requested fields
        import json
        data = json.loads(json_str)
        col_names = [f.strip() for f in fields.split(",")]
        yield tuple(data.get(name) for name in col_names)
$$;

-- Extract the name and city
SELECT * FROM extract_fields(
  '{"name": "Alice", "age": 30, "city": "Seattle"}',
  'name, city'
);
+-------+---------+
| name  | city    |
+-------+---------+
| Alice | Seattle |
+-------+---------+

Definire il analyze metodo

La classe del gestore deve includere un @staticmethod metodo denominato analyze che accetta gli stessi argomenti di UDTF e restituisce un oggetto AnalyzeResult che descrive lo schema di output. Azure Databricks chiama analyze() in fase di pianificazione delle query per risolvere lo schema prima di eseguire la funzione.

Ogni parametro di analyze è un'istanza della AnalyzeArgument classe :

Campo Descrizione
dataType Tipo del parametro di ingresso come DataType. Per gli argomenti della tabella di input, si tratta di un oggetto StructType che rappresenta le colonne della tabella.
value Il valore dell'argomento di input rappresentato come Optional[Any]. Questo vale None per argomenti di tabella o espressioni non costanti.
isTable Indica se l'argomento di input è un argomento di tabella come BooleanType.
isConstantExpression Indica se l'argomento di input è un'espressione costante pieghevole come BooleanType.

Il analyze metodo restituisce un'istanza della AnalyzeResult classe :

Campo Descrizione
schema Schema della tabella dei risultati come StructType.
withSinglePartition Se True, invia tutte le righe di input alla stessa istanza della classe UDTF.
partitionBy Se non è vuoto, partiziona le righe di input dalle espressioni specificate in modo che ogni combinazione univoca venga elaborata da un'istanza UDTF separata.
orderBy Se non è vuoto, specifica un ordinamento di righe all'interno di ogni partizione.
select Se non è vuoto, specifica quali colonne dell'argomento di input TABLE sono ricevute dalla UDTF (Funzione Definita dall'Utente per Tabelle).

Avvertimento

Per le funzioni definite dall'utente polimorfiche di Unity Catalog, è necessario inserire tutte le importazioni all'interno del corpo del metodo analyze(). Le importazioni di primo livello non sono disponibili nell'ambiente sandbox del catalogo Unity.

Inoltrare lo stato da analyze a eval

Il metodo analyze viene eseguito una sola volta durante la pianificazione delle query, quindi è possibile utilizzarlo per pre-elaborare gli argomenti costanti, analizzare le configurazioni o creare tabelle di ricerca. Per inoltrare tali risultati a eval, creare una @dataclass sottoclasse di AnalyzeResult con campi personalizzati, restituirla da analyzee accettarla nel __init__ metodo . In questo modo si evita di ripetere operazioni costose per ogni riga.

L'esempio seguente risolve un codice linguistico in un nome di lingua completo una volta in analyze e lo inoltra, in modo da eval contrassegnare ogni riga senza ripetere la ricerca:

CREATE OR REPLACE FUNCTION tag_language(t TABLE, lang_code STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'TagLanguage'
AS $$
class TagLanguage:
    @staticmethod
    def analyze(t, lang_code):
        from dataclasses import dataclass
        from pyspark.sql.types import StructType, StructField, StringType
        from pyspark.sql.udtf import AnalyzeResult

        @dataclass
        class LangResult(AnalyzeResult):
            language: str = ""

        # Resolve the language code to a full name once during planning
        languages = {"en": "English", "es": "Spanish", "fr": "French", "de": "German"}
        return LangResult(
            schema=StructType([
                StructField("text", StringType()),
                StructField("language", StringType())
            ]),
            language=languages.get(lang_code.value, "Unknown")
        )

    def __init__(self, result):
        self._language = result.language

    def eval(self, row, lang_code: str):
        # Tag each row with the pre-resolved language name
        yield (row['text'], self._language)
$$;

SELECT * FROM tag_language(
  TABLE(VALUES ('Hola mundo'), ('Buenos días') t(text)),
  'es'
);
+-------------+----------+
| text        | language |
+-------------+----------+
| Hola mundo  | Spanish  |
| Buenos días | Spanish  |
+-------------+----------+

Per altri modelli e dettagli sullo stato di inoltro, vedere Inoltrare lo stato alle chiamate futureeval.

Specificare il partizionamento dal analyze metodo

Quando un tipo definito dall'utente polimorfico accetta un argomento di tabella, il analyze metodo può controllare il modo in cui le righe di input vengono distribuite tra le istanze UDTF impostando partitionBy, orderBy, withSinglePartition e select su AnalyzeResult. Ciò elimina la necessità per i chiamanti di specificare PARTITION BY o ORDER BY in SQL.

Per l'API di partizionamento completa ed esempi, vedere Specificare un partizionamento delle righe di input dal analyze metodo .

Isolamento dell'ambiente

Annotazioni

Gli ambienti di isolamento condiviso richiedono Databricks Runtime 17.2 e versioni successive. Nelle versioni precedenti, tutte le funzioni definite dall'utente Python del catalogo Unity vengono eseguite in modalità di isolamento rigorosa.

Le UDTF Python del catalogo Unity con lo stesso proprietario e la stessa sessione possono condividere un ambiente di isolamento per impostazione predefinita. Ciò migliora le prestazioni e riduce l'utilizzo della memoria riducendo il numero di ambienti separati che devono essere avviati.

Isolamento rigoroso

Per garantire che una UDTF venga sempre eseguita nel proprio ambiente completamente isolato, aggiungere la clausola caratteristica STRICT ISOLATION.

La maggior parte delle funzioni definite dall'utente non richiede un isolamento rigoroso. Le UDTF standard per l'elaborazione dei dati traggono vantaggio dall'ambiente di isolamento condiviso predefinito e si eseguono più velocemente con un consumo di memoria inferiore.

Aggiungere la clausola specifica STRICT ISOLATION alle funzioni definite dall'utente che:

  • Eseguire l'input come codice usando eval(), exec()o funzioni simili.
  • Scrivere file nel file system locale.
  • Modificare le variabili globali o lo stato del sistema.
  • Accedere o modificare le variabili di ambiente.

L'esempio UDTF seguente imposta una variabile di ambiente personalizzata, legge nuovamente la variabile e moltiplica un set di numeri usando la variabile . Poiché la UDTF modifica l'ambiente del processo, eseguila in STRICT ISOLATION. In caso contrario, potrebbe causare perdite o sovrascrittura delle variabili di ambiente per altre UDF/UDTF nello stesso ambiente, causando un comportamento non corretto.

CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os

class Multiplier:
    def eval(self, factor: str):
        # Save the factor as an environment variable
        os.environ["FACTOR"] = factor

        # Read it back and convert it to a number
        scale = int(os.getenv("FACTOR", "1"))

        # Multiply 0 through 4 by the factor
        for i in range(5):
            yield (i, i * scale)
$$;

SELECT * FROM multiply_numbers("3");

Impostare DETERMINISTIC se la funzione produce risultati coerenti

Aggiungere DETERMINISTIC alla definizione della funzione se produce gli stessi output per gli stessi input. Ciò consente alle ottimizzazioni delle query di migliorare le prestazioni.

Per impostazione predefinita, si presuppone che le UDTF Python definite dall'utente nel catalogo Batch Unity siano non deterministiche, a meno che non siano dichiarate in modo esplicito. Esempi di funzioni non deterministiche includono la generazione di valori casuali, l'accesso a date o ore correnti o l'esecuzione di chiamate API esterne.

Vedere CREATE FUNCTION (SQL e Python).

Esempi pratici

Gli esempi seguenti dimostrano casi d'uso reali per le funzioni tabella definite dall'utente (UDTF) di Unity Catalog in Python, passando da semplici trasformazioni dei dati a integrazioni esterne complesse.

Esempio: reimplementazione explode

Anche se Spark offre una funzione predefinita explode , la creazione di una versione personalizzata dimostra il modello UDTF fondamentale di prendere un singolo input e produrre più righe di output.

CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
    def eval(self, arr):
        if arr is None:
            return
        for element in arr:
            yield (element,)
$$;

Usare la funzione direttamente in una query SQL:

SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
| element |
+---------+
| apple   |
| banana  |
| cherry  |
+---------+

In alternativa, applicarlo ai dati della tabella esistenti con un LATERAL join:

SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;

Esempio: georilevazione dell'indirizzo IP tramite l'API REST

Questo esempio illustra come le UDTF (funzioni tabella definite dall'utente) possano integrare le API esterne direttamente nel flusso di lavoro SQL. Gli analisti possono arricchire i dati con chiamate API in tempo reale usando una sintassi SQL familiare, senza richiedere processi ETL separati.

CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
    def eval(self, ip_address):
        import requests
        api_url = f"https://api.ip-lookup.example.com/{ip_address}"
        try:
            response = requests.get(api_url)
            response.raise_for_status()
            data = response.json()
            yield (data.get('city'), data.get('country'))
        except requests.exceptions.RequestException as e:
            # Return nothing if the API request fails
            return
$$;

Annotazioni

Le funzioni tabella definite dall'utente Python (UDTF) consentono il traffico di rete TCP/UDP sulle porte 80, 443 e 53 quando si utilizzano risorse di calcolo serverless o risorse di calcolo configurate con modalità di accesso standard.

Usare la funzione per arricchire i dati dei log Web con informazioni geografiche:

SELECT
  l.timestamp,
  l.request_path,
  geo.city,
  geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;

Questo approccio consente l'analisi geografica in tempo reale senza richiedere tabelle di ricerca pre-elaborate o pipeline di dati separate. UDTF gestisce le richieste HTTP, l'analisi JSON e la gestione degli errori, rendendo accessibili origini dati esterne tramite query SQL standard.

Esempio: Trovare una corrispondenza tra indirizzi IP e blocchi di rete CIDR

In questo esempio vengono illustrati gli indirizzi IP corrispondenti rispetto ai blocchi di rete CIDR, un'attività di progettazione dei dati comune che richiede una logica SQL complessa.

Prima di tutto, creare dati di esempio con indirizzi IPv4 e IPv6:

-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
  ('log1', '192.168.1.100'),
  ('log2', '10.0.0.5'),
  ('log3', '172.16.0.10'),
  ('log4', '8.8.8.8'),
  ('log5', '2001:db8::1'),
  ('log6', '2001:db8:85a3::8a2e:370:7334'),
  ('log7', 'fe80::1'),
  ('log8', '::1'),
  ('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);

Successivamente, definire e registrare l'UDTF. Si noti la struttura della classe Python:

  • Il t TABLE parametro accetta una tabella di input con qualsiasi schema. La funzione tabella definita dall'utente (UDTF) si adatta automaticamente all'elaborazione di qualsiasi colonna venga fornita. Questa flessibilità significa che è possibile usare la stessa funzione in tabelle diverse senza modificare la firma della funzione. Tuttavia, è necessario controllare attentamente lo schema delle righe per garantire la compatibilità.
  • Il metodo __init__ viene utilizzato per la configurazione una tantum pesante, come il caricamento dell'elenco di reti di grandi dimensioni. Questa operazione viene eseguita una volta per ogni partizione della tabella di input.
  • Il eval metodo elabora ogni riga e contiene la logica di corrispondenza principale. Questo metodo viene eseguito esattamente una volta per ogni riga nella partizione di input e ogni esecuzione viene eseguita dall'istanza corrispondente della IpMatcher classe UDTF per tale partizione.
  • La HANDLER clausola specifica il nome della classe Python che implementa la logica UDTF.
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
    def __init__(self):
        import ipaddress
        # Heavy initialization - load networks once per partition
        self.nets = []
        cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
                 '2001:db8::/32', 'fe80::/10', '::1/128']
        for cidr in cidrs:
            self.nets.append(ipaddress.ip_network(cidr))

    def eval(self, row):
        import ipaddress
	    # Validate that required fields exist
        required_fields = ['log_id', 'ip_address']
        for field in required_fields:
            if field not in row:
                raise ValueError(f"Missing required field: {field}")
        try:
            ip = ipaddress.ip_address(row['ip_address'])
            for net in self.nets:
                if ip in net:
                    yield (row['log_id'], row['ip_address'], str(net), ip.version)
                    return
            yield (row['log_id'], row['ip_address'], None, ip.version)
        except ValueError:
            yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;

Ora che ip_cidr_matcher è registrato in Unity Catalog, chiamarlo direttamente da SQL usando la TABLE() sintassi :

-- Process all IP addresses
SELECT
  *
FROM
  ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
  log_id;
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address                    | network         | ip_version  |
+--------+-------------------------------+-----------------+-------------+
| log1   | 192.168.1.100                 | 192.168.0.0/16  | 4           |
| log2   | 10.0.0.5                      | 10.0.0.0/8      | 4           |
| log3   | 172.16.0.10                   | 172.16.0.0/12   | 4           |
| log4   | 8.8.8.8                       | null            | 4           |
| log5   | 2001:db8::1                   | 2001:db8::/32   | 6           |
| log6   | 2001:db8:85a3::8a2e:370:7334  | 2001:db8::/32   | 6           |
| log7   | fe80::1                       | fe80::/10       | 6           |
| log8   | ::1                           | ::1/128         | 6           |
| log9   | 2001:db8:1234:5678::1         | 2001:db8::/32   | 6           |
+--------+-------------------------------+-----------------+-------------+

Esempio: didascalie delle immagini batch utilizzando gli endpoint di visione di Azure Databricks

Questo esempio dimostra la creazione di didascalie per immagini in batch utilizzando un endpoint di servizio del modello di visione di Azure Databricks. Illustra l'uso di terminate() per l'elaborazione batch e l'esecuzione basata su partizioni.

  1. Creare una tabella con URL di immagine pubblica:

    CREATE OR REPLACE TEMPORARY VIEW sample_images AS
    VALUES
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'),
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'),
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'),
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery')
    images(image_url, category);
    
  2. Creare un file UUDTF python del catalogo Unity per generare didascalie di immagini:

    1. Inizializzare UDTF con la configurazione, inclusa la dimensione del batch, il token dell'API Azure Databricks, l'endpoint del modello di visione e l'URL dell'area di lavoro.
    2. Nel metodo eval, raccogliere gli URL delle immagini in un buffer. Quando il buffer raggiunge la dimensione del batch, viene attivata l'elaborazione batch. In questo modo si garantisce che più immagini vengano elaborate insieme in una singola chiamata API anziché singole chiamate per immagine.
    3. Nel metodo di elaborazione batch scaricare tutte le immagini memorizzate nel buffer, codificarle come base64 e inviarle a una singola richiesta API a Databricks VisionModel. Il modello elabora tutte le immagini contemporaneamente e restituisce le didascalie per l'intero batch.
    4. Il terminate metodo viene eseguito esattamente una volta alla fine di ogni partizione. Nel metodo terminate, processare tutte le immagini rimanenti nel buffer e restituire tutte le didascalie raccolte come risultati.

Annotazioni

Sostituire <workspace-url> con l'URL effettivo dell'area di lavoro di Azure Databricks (https://your-workspace.cloud.databricks.com).

CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
    def __init__(self):
        self.batch_size = 3
        self.vision_endpoint = "databricks-claude-sonnet-4-5"
        self.workspace_url = "<workspace-url>"
        self.image_buffer = []
        self.results = []

    def eval(self, row, api_token):
        self.image_buffer.append((str(row[0]), api_token))
        if len(self.image_buffer) >= self.batch_size:
            self._process_batch()

    def terminate(self):
        if self.image_buffer:
            self._process_batch()
        for caption in self.results:
            yield (caption,)

    def _process_batch(self):
        batch_data = self.image_buffer.copy()
        self.image_buffer.clear()

        import base64
        import httpx
        import requests

        # API request timeout in seconds
        api_timeout = 60
        # Maximum tokens for vision model response
        max_response_tokens = 300
        # Temperature controls randomness (lower = more deterministic)
        model_temperature = 0.3

        # create a batch for the images
        batch_images = []
        api_token = batch_data[0][1] if batch_data else None

        for image_url, _ in batch_data:
            image_response = httpx.get(image_url, timeout=15)
            image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
            batch_images.append(image_data)

        content_items = [{
            "type": "text",
            "text": "Provide brief captions for these images, one per line."
        }]
        for img_data in batch_images:
            content_items.append({
                "type": "image_url",
                "image_url": {
                    "url": "data:image/jpeg;base64," + img_data
                }
            })

        payload = {
            "messages": [{
                "role": "user",
                "content": content_items
            }],
            "max_tokens": max_response_tokens,
            "temperature": model_temperature
        }

        response = requests.post(
            self.workspace_url + "/serving-endpoints/" +
            self.vision_endpoint + "/invocations",
            headers={
                'Authorization': 'Bearer ' + api_token,
                'Content-Type': 'application/json'
            },
            json=payload,
            timeout=api_timeout
        )

        result = response.json()
        batch_response = result['choices'][0]['message']['content'].strip()

        lines = batch_response.split('\n')
        captions = [line.strip() for line in lines if line.strip()]

        while len(captions) < len(batch_data):
            captions.append(batch_response)

        self.results.extend(captions[:len(batch_data)])
$$;

Per usare la didascalia dell'immagine batch UDTF, chiamarla usando la tabella di immagini di esempio:

Annotazioni

Sostituire your_secret_scope e api_token con l'ambito effettivo del segreto e il nome effettivo della chiave per il token dell'API Databricks.

SELECT
  caption
FROM
  batch_inference_image_caption(
    data => TABLE(sample_images),
    api_token => secret('your_secret_scope', 'api_token')
  )
+---------------------------------------------------------------------------------------------------------------+
| caption                                                                                                       |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies                                     |
| Black ant in detailed macro photography standing on a textured surface                                        |
| Tabby cat lounging comfortably on a white ledge against a white wall                                          |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+

È anche possibile generare categorie di sottotitoli immagine per categoria:

SELECT
  *
FROM
  batch_inference_image_caption(
    TABLE(sample_images)
    PARTITION BY category ORDER BY (category),
    secret('your_secret_scope', 'api_token')
  )
+------------------------------------------------------------------------------------------------------+
| caption                                                                                              |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface                               |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space.  |
| Tabby cat lounging comfortably on white ledge against white wall                                     |
| Wooden boardwalk cutting through lush wetland grasses under blue skies                               |
+------------------------------------------------------------------------------------------------------+

Esempio: curva ROC e calcolo AUC per la valutazione del modello di Machine Learning

Questo esempio dimostra come calcolare le curve ROC (Receiver Operating Characteristic) e i punteggi di area sotto la curva (AUC) per la valutazione di modelli di classificazione binaria mediante scikit-learn.

In questo esempio vengono illustrati diversi modelli importanti:

  • Utilizzo della libreria esterna: integra scikit-learn per il calcolo della curva ROC
  • Aggregazione con stato: Accumula predizioni in tutte le righe prima di calcolare le metriche
  • terminate() utilizzo del metodo: elabora il set di dati completo e restituisce risultati solo dopo che tutte le righe sono state valutate
  • Gestione degli errori: convalida le colonne obbligatorie presenti nella tabella di input

UDTF accumula tutte le stime in memoria usando il eval() metodo , quindi calcola e restituisce la curva ROC completa nel terminate() metodo . Questo modello è utile per le metriche che richiedono il set di dati completo per il calcolo.

CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
    def __init__(self):
        from sklearn import metrics
        self._roc_curve = metrics.roc_curve
        self._roc_auc_score = metrics.roc_auc_score

        self._true_labels = []
        self._predicted_scores = []

    def eval(self, row):
        if 'y_true' not in row or 'y_score' not in row:
            raise KeyError("Required columns 'y_true' and 'y_score' not found")

        true_label = row['y_true']
        predicted_score = row['y_score']

        label = float(true_label)
        self._true_labels.append(label)
        self._predicted_scores.append(float(predicted_score))

    def terminate(self):
        false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
            self._true_labels,
            self._predicted_scores,
            drop_intermediate=False
        )

        auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))

        for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
            yield float(threshold), float(tpr), float(fpr), auc_score
$$;

Creare dati di classificazione binaria di esempio con stime:

CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
  ( 1, 1.0, 0.95, 'high_confidence_positive'),
  ( 2, 1.0, 0.87, 'high_confidence_positive'),
  ( 3, 1.0, 0.82, 'medium_confidence_positive'),
  ( 4, 0.0, 0.78, 'false_positive'),
  ( 5, 1.0, 0.71, 'medium_confidence_positive'),
  ( 6, 0.0, 0.65, 'false_positive'),
  ( 7, 0.0, 0.58, 'true_negative'),
  ( 8, 1.0, 0.52, 'low_confidence_positive'),
  ( 9, 0.0, 0.45, 'true_negative'),
  (10, 0.0, 0.38, 'true_negative'),
  (11, 1.0, 0.31, 'low_confidence_positive'),
  (12, 0.0, 0.15, 'true_negative'),
  (13, 0.0, 0.08, 'high_confidence_negative'),
  (14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);

Calcolare la curva ROC e l'AUC:

SELECT
    threshold,
    true_positive_rate,
    false_positive_rate,
    auc
FROM compute_roc_curve(
  TABLE(
    SELECT y_true, y_score
    FROM binary_classification_data
    WHERE y_true IS NOT NULL AND y_score IS NOT NULL
    ORDER BY sample_id
  )
)
ORDER BY threshold DESC;
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate  | false_positive_rate  | auc   |
+-----------+---------------------+----------------------+-------+
| 1.95      | 0.0                 | 0.0                  | 0.786 |
| 0.95      | 0.167               | 0.0                  | 0.786 |
| 0.87      | 0.333               | 0.0                  | 0.786 |
| 0.82      | 0.5                 | 0.0                  | 0.786 |
| 0.78      | 0.5                 | 0.125                | 0.786 |
| 0.71      | 0.667               | 0.125                | 0.786 |
| 0.65      | 0.667               | 0.25                 | 0.786 |
| 0.58      | 0.667               | 0.375                | 0.786 |
| 0.52      | 0.833               | 0.375                | 0.786 |
| 0.45      | 0.833               | 0.5                  | 0.786 |
| 0.38      | 0.833               | 0.625                | 0.786 |
| 0.31      | 1.0                 | 0.625                | 0.786 |
| 0.15      | 1.0                 | 0.75                 | 0.786 |
| 0.08      | 1.0                 | 0.875                | 0.786 |
| 0.03      | 1.0                 | 1.0                  | 0.786 |
+-----------+---------------------+----------------------+-------+

Esempio: Proiezione dinamica delle colonne da un argomento di tabella

Questo esempio combina funzioni definite dall'utente polimorfiche con argomenti di tabella. UDTF accetta una tabella e un elenco delimitato da virgole di nomi di colonna, quindi proietta solo tali colonne dall'input. Il analyze metodo esamina lo schema della tabella di input e compila uno schema di output contenente solo le colonne richieste.

CREATE OR REPLACE FUNCTION project_columns(t TABLE, columns STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ProjectColumns'
AS $$
class ProjectColumns:
    @staticmethod
    def analyze(t, columns):
        from pyspark.sql.types import StructType
        from pyspark.sql.udtf import AnalyzeResult

        requested = [c.strip() for c in columns.value.split(",")]
        input_schema = t.dataType
        output_fields = []
        for field in input_schema.fields:
            if field.name in requested:
                output_fields.append(field)
        if not output_fields:
            raise ValueError(
                f"None of the requested columns {requested} "
                f"exist in the input table"
            )
        return AnalyzeResult(schema=StructType(output_fields))

    def eval(self, row, columns: str):
        requested = [c.strip() for c in columns.split(",")]
        yield tuple(row[col] for col in requested if col in row)
$$;

Usare la funzione per selezionare colonne specifiche da una tabella:

SELECT * FROM project_columns(
  TABLE(SELECT * FROM samples.nyctaxi.trips LIMIT 5),
  'pickup_zip, dropoff_zip, fare_amount'
);

Limitazioni

Le seguenti limitazioni si applicano alle Funzioni Definite dall'Utente Python (UDTF) nel Catalogo Unity.

Passaggi successivi