Funções de tabela definidas pelo usuário do Python (UDTFs) no Catálogo do Unity

Importante

O registro de UDTFs do Python no Catálogo do Unity está em versão prévia pública.

Uma UDTF (função de tabela definida pelo usuário) do Catálogo do Unity registra funções que retornam tabelas completas em vez de valores escalares. Ao contrário das funções escalares que retornam um único valor de resultado de cada chamada, UDTFs são invocados na cláusula de FROM uma instrução SQL e podem retornar várias linhas e colunas.

UDTFs são particularmente úteis para:

  • Transformando matrizes ou estruturas de dados complexas em várias linhas
  • Integrando APIs ou serviços externos em fluxos de trabalho do SQL
  • Implementando a lógica de geração ou enriquecimento de dados personalizados
  • Processamento de dados que exigem operações com estado entre linhas

Cada chamada UDTF aceita zero ou mais argumentos. Esses argumentos podem ser expressões escalares ou argumentos de tabela que representam tabelas de entrada inteiras.

UDTFs podem ser registrados de duas maneiras:

Requirements

Os UDTFs do Python do Catálogo do Unity têm suporte nos seguintes tipos de computação:

  • Notebooks e trabalhos sem servidor
  • Computação clássica com modo de acesso padrão (Databricks Runtime 17.1 e superior)
  • SQL Warehouse (sem servidor ou profissional)

Criar um UDTF no Catálogo do Unity

Use a DDL do SQL para criar uma UDTF governada no Unity Catalog. UDTFs são invocados usando a cláusula FROM de uma instrução SQL.

CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
    """
    Basic UDTF that computes a sequence of integers
    and includes the square of each number in the range.
    """
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num * num)
$$;

SELECT * FROM square_numbers(1, 5);

+-----+---------+
| num | squared |
+-----+---------+
| 1   | 1       |
| 2   | 4       |
| 3   | 9       |
| 4   | 16      |
| 5   | 25      |
+-----+---------+

Azure Databricks implementa UDTFs do Python como classes Python que incluem um método obrigatório eval que gera linhas de saída.

Argumentos de tabela

Observação

TABLE há suporte para argumentos no Databricks Runtime 17.2 e superior.

Os UDTFs podem aceitar tabelas inteiras como argumentos de entrada, permitindo transformações e agregações complexas com estado.

eval() e terminate() métodos de ciclo de vida

Os argumentos de tabela em UDTFs usam as seguintes funções para processar cada linha:

  • eval(): chamado uma vez para cada linha na tabela de entrada. Esse é o principal método de processamento e é necessário.
  • terminate(): chamado uma vez no final de cada partição, depois que todas as linhas tiverem sido processadas por eval(). Use esse método para produzir resultados agregados finais ou executar operações de limpeza. Esse método é opcional, mas essencial para operações com estado, como agregações, contagem ou processamento em lote.

Para obter mais informações sobre eval() e terminate() métodos, consulte a documentação do Apache Spark: UDTF do Python.

Padrões de acesso a linhas

eval() recebe linhas dos argumentos de TABLE como objetos pyspark.sql.Row. Você pode acessar valores por nome de coluna (row['id'], row['name']) ou por índice (row[0], row[1]).

  • Flexibilidade de esquema: declarar TABLE argumentos sem definições de esquema (por exemplo, data TABLE, ). t TABLE A função aceita qualquer estrutura de tabela, portanto, seu código deve validar se as colunas necessárias existem.

Veja Exemplo: Correspondência de endereços IP com blocos de rede CIDR e Exemplo: Legendagem em lote de imagens usando endpoints de visão do Azure Databricks.

Calcular um esquema de saída dinâmica (UDTFs polimórficos)

Observação

UDTFs de UC polimórficos exigem o Databricks Runtime 18.1 ou superior.

Um UDTF polimórfico determina seu esquema de saída dinamicamente no momento da consulta usando um método estático analyze() , em vez de declarar colunas antecipadamente. Para criar uma, use RETURNS TABLE sem definições de coluna e defina um analyze() método na classe de manipulador.

O exemplo a seguir extrai campos especificados pelo chamador de uma cadeia de caracteres JSON, retornando colunas diferentes dependendo do fields argumento:

CREATE OR REPLACE FUNCTION extract_fields(json_str STRING, fields STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ExtractFields'
AS $$
class ExtractFields:
    @staticmethod
    def analyze(json_str, fields):

        # Build the output schema from the requested field names
        from pyspark.sql.types import StructType, StructField, StringType
        from pyspark.sql.udtf import AnalyzeResult
        col_names = [f.strip() for f in fields.value.split(",")]
        return AnalyzeResult(
            StructType([StructField(name, StringType()) for name in col_names])
        )

    def eval(self, json_str: str, fields: str):
        # Parse the JSON and yield only the requested fields
        import json
        data = json.loads(json_str)
        col_names = [f.strip() for f in fields.split(",")]
        yield tuple(data.get(name) for name in col_names)
$$;

-- Extract the name and city
SELECT * FROM extract_fields(
  '{"name": "Alice", "age": 30, "city": "Seattle"}',
  'name, city'
);
+-------+---------+
| name  | city    |
+-------+---------+
| Alice | Seattle |
+-------+---------+

Definir o analyze método

A classe de manipulador deve incluir um @staticmethod método chamado analyze que aceita os mesmos argumentos que o UDTF e retorna um AnalyzeResult esquema de descrição de saída. Azure Databricks chama analyze() em tempo de planejamento de consulta para resolver o esquema antes de executar a função.

Cada parâmetro de analyze é uma instância da classe AnalyzeArgument

Campo Descrição
dataType O tipo do argumento de entrada como um DataType. Para argumentos de tabela de entrada, isso é um StructType representando as colunas da tabela.
value O valor do argumento de entrada como um Optional[Any]. Isso é None para argumentos de tabela ou expressões não constantes.
isTable Se o argumento de entrada é um argumento de tabela como um BooleanType.
isConstantExpression Se o argumento de entrada é uma expressão dobrável constante como um BooleanType.

O analyze método retorna uma instância da AnalyzeResult classe:

Campo Descrição
schema O esquema da tabela de resultados representado por um StructType.
withSinglePartition Se True, envia todas as linhas de entrada para a mesma instância de classe UDTF.
partitionBy Se não estiver vazio, particiona linhas de entrada pelas expressões especificadas para que cada combinação exclusiva seja processada por uma instância UDTF separada.
orderBy Se não estiver vazio, especifica uma ordenação de linhas em cada partição.
select Se não estiver vazio, especifica quais colunas do argumento de entrada TABLE o UDTF recebe.

Aviso

Para UDTFs polimórficas do Catálogo do Unity, você deve colocar todas as importações dentro do corpo do método analyze(). As importações de nível superior não estão disponíveis no ambiente sandbox do Catálogo do Unity.

Encaminhar o estado de analyze para eval

O método analyze é executado uma vez no momento de planejamento da consulta, permitindo que você o utilize para pré-processar argumentos constantes, analisar configurações ou construir pesquisas. Para encaminhar esses resultados para eval, crie uma subclasse de @dataclass de AnalyzeResult com campos personalizados, retorne-a de analyze e aceite-a no método __init__. Isso evita repetir uma tarefa custosa para cada linha.

O exemplo a seguir converte um código de idioma em um nome de idioma completo uma vez em analyze e encaminha esse nome, para que eval possa marcar cada linha sem repetir a pesquisa.

CREATE OR REPLACE FUNCTION tag_language(t TABLE, lang_code STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'TagLanguage'
AS $$
class TagLanguage:
    @staticmethod
    def analyze(t, lang_code):
        from dataclasses import dataclass
        from pyspark.sql.types import StructType, StructField, StringType
        from pyspark.sql.udtf import AnalyzeResult

        @dataclass
        class LangResult(AnalyzeResult):
            language: str = ""

        # Resolve the language code to a full name once during planning
        languages = {"en": "English", "es": "Spanish", "fr": "French", "de": "German"}
        return LangResult(
            schema=StructType([
                StructField("text", StringType()),
                StructField("language", StringType())
            ]),
            language=languages.get(lang_code.value, "Unknown")
        )

    def __init__(self, result):
        self._language = result.language

    def eval(self, row, lang_code: str):
        # Tag each row with the pre-resolved language name
        yield (row['text'], self._language)
$$;

SELECT * FROM tag_language(
  TABLE(VALUES ('Hola mundo'), ('Buenos días') t(text)),
  'es'
);
+-------------+----------+
| text        | language |
+-------------+----------+
| Hola mundo  | Spanish  |
| Buenos días | Spanish  |
+-------------+----------+

Para obter mais padrões e detalhes sobre o estado de transmissão, consulte Encaminhar o estado para chamadas futuraseval.

Especifique o particionamento a partir do método analyze

Quando um UDTF polimórfico aceita um argumento de tabela, o método analyze pode controlar como as linhas de entrada são distribuídas entre as instâncias do UDTF, definindo partitionBy, orderBy, withSinglePartition e select na AnalyzeResult. Isso elimina a necessidade de os chamadores especificarem PARTITION BY ou ORDER BY no SQL.

Para obter a API de particionamento completo e exemplos, consulte Especificar um particionamento das linhas de entrada do analyze método.

Isolamento de ambiente

Observação

Ambientes de isolamento compartilhado exigem o Databricks Runtime 17.2 e superior. Em versões anteriores, todos os UDTFs do Python no Unity Catalog eram executados em modo de isolamento estrito.

UDTFs Python do Unity Catalog com o mesmo proprietário e sessão podem compartilhar um ambiente de isolamento por padrão. Isso melhora o desempenho e reduz o uso de memória reduzindo o número de ambientes separados que precisam ser iniciados.

Isolamento estrito

Para garantir que um UDTF sempre seja executado em seu próprio ambiente totalmente isolado, adicione a cláusula de característica STRICT ISOLATION.

A maioria dos UDTFs não precisa de isolamento estrito. Os UDTFs de processamento de dados padrão se beneficiam do ambiente de isolamento compartilhado padrão e são executados mais rapidamente com menor consumo de memória.

Adicione a cláusula STRICT ISOLATION característica às UDTFs que:

  • Execute a entrada como código usando eval(), exec()ou funções semelhantes.
  • Gravar arquivos no sistema de arquivos local.
  • Modificar variáveis globais ou estado do sistema.
  • Acesse ou modifique variáveis de ambiente.

O exemplo de UDTF a seguir define uma variável de ambiente personalizada, lê a variável novamente e multiplica um conjunto de números usando a variável. Como o UDTF modifica o ambiente do processo, execute-o em STRICT ISOLATION. Caso contrário, ele poderá vazar ou substituir variáveis de ambiente para outros UDFs/UDTFs no mesmo ambiente, causando um comportamento incorreto.

CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os

class Multiplier:
    def eval(self, factor: str):
        # Save the factor as an environment variable
        os.environ["FACTOR"] = factor

        # Read it back and convert it to a number
        scale = int(os.getenv("FACTOR", "1"))

        # Multiply 0 through 4 by the factor
        for i in range(5):
            yield (i, i * scale)
$$;

SELECT * FROM multiply_numbers("3");

Definir DETERMINISTIC se sua função produz resultados consistentes

Adicione DETERMINISTIC à sua definição de função se ela produzir as mesmas saídas para as mesmas entradas. Isso permite otimizações de consulta para melhorar o desempenho.

Por padrão, as UDTFs do Python do Unity Catalog em lote são consideradas não determinísticas, a menos que explicitamente declaradas como tais. Exemplos de funções não determinísticas incluem: gerar valores aleatórios, acessar datas ou horários atuais ou fazer chamadas à API externa.

Consulte CREATE FUNCTION (SQL e Python).

Exemplos práticos

Os exemplos a seguir demonstram casos de uso do mundo real para UDTFs do Unity Catalog em Python, indo de transformações simples de dados a integrações complexas com sistemas externos.

Exemplo: Reimplementação explode

Embora o Spark forneça uma função interna explode , criar sua própria versão demonstra o padrão UDTF fundamental de usar uma única entrada e produzir várias linhas de saída.

CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
    def eval(self, arr):
        if arr is None:
            return
        for element in arr:
            yield (element,)
$$;

Use a função diretamente em uma consulta SQL:

SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
| element |
+---------+
| apple   |
| banana  |
| cherry  |
+---------+

Ou aplique-o aos dados de tabela existentes com um joinLATERAL:

SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;

Exemplo: localização geográfica de endereço IP via API REST

Este exemplo demonstra como os UDTFs podem integrar APIs externas diretamente ao fluxo de trabalho do SQL. Os analistas podem enriquecer dados com chamadas à API em tempo real usando sintaxe SQL familiar, sem a necessidade de processos ETL separados.

CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
    def eval(self, ip_address):
        import requests
        api_url = f"https://api.ip-lookup.example.com/{ip_address}"
        try:
            response = requests.get(api_url)
            response.raise_for_status()
            data = response.json()
            yield (data.get('city'), data.get('country'))
        except requests.exceptions.RequestException as e:
            # Return nothing if the API request fails
            return
$$;

Observação

Os UDTFs do Python permitem o tráfego de rede TCP/UDP nas portas 80, 443 e 53 ao usar computação sem servidor ou computação configurada com o modo de acesso padrão.

Use a função para enriquecer dados de log da Web com informações geográficas:

SELECT
  l.timestamp,
  l.request_path,
  geo.city,
  geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;

Essa abordagem permite a análise geográfica em tempo real sem exigir tabelas de pesquisa pré-processadas ou pipelines de dados separados. O UDTF lida com solicitações HTTP, análise JSON e tratamento de erros, tornando as fontes de dados externas acessíveis por meio de consultas SQL padrão.

Exemplo: corresponder endereços IP com blocos de rede CIDR

Este exemplo demonstra a correspondência de endereços IP em blocos de rede CIDR, uma tarefa de engenharia de dados comum que requer lógica SQL complexa.

Primeiro, crie dados de exemplo com endereços IPv4 e IPv6:

-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
  ('log1', '192.168.1.100'),
  ('log2', '10.0.0.5'),
  ('log3', '172.16.0.10'),
  ('log4', '8.8.8.8'),
  ('log5', '2001:db8::1'),
  ('log6', '2001:db8:85a3::8a2e:370:7334'),
  ('log7', 'fe80::1'),
  ('log8', '::1'),
  ('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);

Em seguida, defina e registre o UDTF. Observe a estrutura da classe Python:

  • O t TABLE parâmetro aceita uma tabela de entrada com qualquer esquema. O UDTF se adapta automaticamente para processar as colunas fornecidas. Essa flexibilidade significa que você pode usar a mesma função em tabelas diferentes sem modificar a assinatura da função. No entanto, você deve verificar cuidadosamente o esquema das linhas para garantir a compatibilidade.
  • O método __init__ é usado para uma configuração pesada e de único uso, como carregar a grande lista de redes. Esse trabalho ocorre uma vez por partição da tabela de entrada.
  • O eval método processa cada linha e contém a lógica de correspondência principal. Esse método é executado exatamente uma vez para cada linha na partição de entrada e cada execução é executada pela instância correspondente da IpMatcher classe UDTF para essa partição.
  • A HANDLER cláusula especifica o nome da classe Python que implementa a lógica UDTF.
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
    def __init__(self):
        import ipaddress
        # Heavy initialization - load networks once per partition
        self.nets = []
        cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
                 '2001:db8::/32', 'fe80::/10', '::1/128']
        for cidr in cidrs:
            self.nets.append(ipaddress.ip_network(cidr))

    def eval(self, row):
        import ipaddress
	    # Validate that required fields exist
        required_fields = ['log_id', 'ip_address']
        for field in required_fields:
            if field not in row:
                raise ValueError(f"Missing required field: {field}")
        try:
            ip = ipaddress.ip_address(row['ip_address'])
            for net in self.nets:
                if ip in net:
                    yield (row['log_id'], row['ip_address'], str(net), ip.version)
                    return
            yield (row['log_id'], row['ip_address'], None, ip.version)
        except ValueError:
            yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;

Agora que ip_cidr_matcher está registrado no Catálogo do Unity, chame-o diretamente do SQL usando a TABLE() sintaxe:

-- Process all IP addresses
SELECT
  *
FROM
  ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
  log_id;
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address                    | network         | ip_version  |
+--------+-------------------------------+-----------------+-------------+
| log1   | 192.168.1.100                 | 192.168.0.0/16  | 4           |
| log2   | 10.0.0.5                      | 10.0.0.0/8      | 4           |
| log3   | 172.16.0.10                   | 172.16.0.0/12   | 4           |
| log4   | 8.8.8.8                       | null            | 4           |
| log5   | 2001:db8::1                   | 2001:db8::/32   | 6           |
| log6   | 2001:db8:85a3::8a2e:370:7334  | 2001:db8::/32   | 6           |
| log7   | fe80::1                       | fe80::/10       | 6           |
| log8   | ::1                           | ::1/128         | 6           |
| log9   | 2001:db8:1234:5678::1         | 2001:db8::/32   | 6           |
+--------+-------------------------------+-----------------+-------------+

Exemplo: Legenda de imagem em lote usando pontos de extremidade de visão do Azure Databricks

Este exemplo demonstra a geração de legendas para imagens em lote usando um ponto de extremidade de serviço de modelos de visão computacional do Azure Databricks. Ele mostra o uso terminate() para processamento em lote e execução baseada em partição.

  1. Crie uma tabela com URLs de imagem pública:

    CREATE OR REPLACE TEMPORARY VIEW sample_images AS
    VALUES
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'),
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'),
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'),
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery')
    images(image_url, category);
    
  2. Crie um UDTF do Catálogo do Unity em Python para gerar legendas de imagem.

    1. Inicialize o UDTF com as configurações, incluindo o tamanho do lote, o token de API do Azure Databricks, o ponto de extremidade do modelo de visão computacional e a URL do workspace.
    2. No método eval, colete as URLs das imagens em um buffer. Quando o buffer atingir o tamanho do lote, dispare o processamento em lote. Isso garante que várias imagens sejam processadas juntas em uma única chamada à API em vez de chamadas individuais por imagem.
    3. No método de processamento em lote, baixe todas as imagens em buffer, codifique-as como base64 e envie-as para uma única solicitação de API para o Databricks VisionModel. O modelo processa todas as imagens simultaneamente e retorna legendas para todo o lote.
    4. O terminate método é executado exatamente uma vez no final de cada partição. No método de término, processe as imagens restantes no buffer e produza todas as legendas coletadas como resultados.

Observação

Substitua <workspace-url> pela URL real do workspace do Azure Databricks (https://your-workspace.cloud.databricks.com).

CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
    def __init__(self):
        self.batch_size = 3
        self.vision_endpoint = "databricks-claude-sonnet-4-5"
        self.workspace_url = "<workspace-url>"
        self.image_buffer = []
        self.results = []

    def eval(self, row, api_token):
        self.image_buffer.append((str(row[0]), api_token))
        if len(self.image_buffer) >= self.batch_size:
            self._process_batch()

    def terminate(self):
        if self.image_buffer:
            self._process_batch()
        for caption in self.results:
            yield (caption,)

    def _process_batch(self):
        batch_data = self.image_buffer.copy()
        self.image_buffer.clear()

        import base64
        import httpx
        import requests

        # API request timeout in seconds
        api_timeout = 60
        # Maximum tokens for vision model response
        max_response_tokens = 300
        # Temperature controls randomness (lower = more deterministic)
        model_temperature = 0.3

        # create a batch for the images
        batch_images = []
        api_token = batch_data[0][1] if batch_data else None

        for image_url, _ in batch_data:
            image_response = httpx.get(image_url, timeout=15)
            image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
            batch_images.append(image_data)

        content_items = [{
            "type": "text",
            "text": "Provide brief captions for these images, one per line."
        }]
        for img_data in batch_images:
            content_items.append({
                "type": "image_url",
                "image_url": {
                    "url": "data:image/jpeg;base64," + img_data
                }
            })

        payload = {
            "messages": [{
                "role": "user",
                "content": content_items
            }],
            "max_tokens": max_response_tokens,
            "temperature": model_temperature
        }

        response = requests.post(
            self.workspace_url + "/serving-endpoints/" +
            self.vision_endpoint + "/invocations",
            headers={
                'Authorization': 'Bearer ' + api_token,
                'Content-Type': 'application/json'
            },
            json=payload,
            timeout=api_timeout
        )

        result = response.json()
        batch_response = result['choices'][0]['message']['content'].strip()

        lines = batch_response.split('\n')
        captions = [line.strip() for line in lines if line.strip()]

        while len(captions) < len(batch_data):
            captions.append(batch_response)

        self.results.extend(captions[:len(batch_data)])
$$;

Para usar a legenda de imagem do lote UDTF, chame-a usando a tabela de imagens de exemplo:

Observação

Substitua your_secret_scope e api_token pelo escopo do segredo e pelo nome da chave reais para o token de API do Databricks.

SELECT
  caption
FROM
  batch_inference_image_caption(
    data => TABLE(sample_images),
    api_token => secret('your_secret_scope', 'api_token')
  )
+---------------------------------------------------------------------------------------------------------------+
| caption                                                                                                       |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies                                     |
| Black ant in detailed macro photography standing on a textured surface                                        |
| Tabby cat lounging comfortably on a white ledge against a white wall                                          |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+

Você também pode gerar legendas de imagem categoria por categoria:

SELECT
  *
FROM
  batch_inference_image_caption(
    TABLE(sample_images)
    PARTITION BY category ORDER BY (category),
    secret('your_secret_scope', 'api_token')
  )
+------------------------------------------------------------------------------------------------------+
| caption                                                                                              |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface                               |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space.  |
| Tabby cat lounging comfortably on white ledge against white wall                                     |
| Wooden boardwalk cutting through lush wetland grasses under blue skies                               |
+------------------------------------------------------------------------------------------------------+

Exemplo: curva ROC e computação AUC para avaliação de modelo de ML

Este exemplo demonstra o cálculo de curvas ROC (receiver operating characteristic) e as pontuações da área sob a curva (AUC) para a avaliação de modelos de classificação binária usando scikit-learn.

Este exemplo mostra vários padrões importantes:

  • Uso da biblioteca externa: integra o scikit-learn para computação de curva ROC
  • Agregação stateful: acumula previsões em todas as linhas antes de computar as métricas
  • terminate() uso do método: processa o conjunto de dados completo e produz resultados somente após todas as linhas terem sido avaliadas
  • Tratamento de erros: valida as colunas necessárias na tabela de entrada

O UDTF acumula todas as previsões na memória usando o eval() método, em seguida, calcula e produz a curva ROC completa no terminate() método. Esse padrão é útil para métricas que exigem o conjunto de dados completo para cálculo.

CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
    def __init__(self):
        from sklearn import metrics
        self._roc_curve = metrics.roc_curve
        self._roc_auc_score = metrics.roc_auc_score

        self._true_labels = []
        self._predicted_scores = []

    def eval(self, row):
        if 'y_true' not in row or 'y_score' not in row:
            raise KeyError("Required columns 'y_true' and 'y_score' not found")

        true_label = row['y_true']
        predicted_score = row['y_score']

        label = float(true_label)
        self._true_labels.append(label)
        self._predicted_scores.append(float(predicted_score))

    def terminate(self):
        false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
            self._true_labels,
            self._predicted_scores,
            drop_intermediate=False
        )

        auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))

        for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
            yield float(threshold), float(tpr), float(fpr), auc_score
$$;

Crie dados de classificação binária de exemplo com previsões:

CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
  ( 1, 1.0, 0.95, 'high_confidence_positive'),
  ( 2, 1.0, 0.87, 'high_confidence_positive'),
  ( 3, 1.0, 0.82, 'medium_confidence_positive'),
  ( 4, 0.0, 0.78, 'false_positive'),
  ( 5, 1.0, 0.71, 'medium_confidence_positive'),
  ( 6, 0.0, 0.65, 'false_positive'),
  ( 7, 0.0, 0.58, 'true_negative'),
  ( 8, 1.0, 0.52, 'low_confidence_positive'),
  ( 9, 0.0, 0.45, 'true_negative'),
  (10, 0.0, 0.38, 'true_negative'),
  (11, 1.0, 0.31, 'low_confidence_positive'),
  (12, 0.0, 0.15, 'true_negative'),
  (13, 0.0, 0.08, 'high_confidence_negative'),
  (14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);

Compute a curva ROC e a AUC:

SELECT
    threshold,
    true_positive_rate,
    false_positive_rate,
    auc
FROM compute_roc_curve(
  TABLE(
    SELECT y_true, y_score
    FROM binary_classification_data
    WHERE y_true IS NOT NULL AND y_score IS NOT NULL
    ORDER BY sample_id
  )
)
ORDER BY threshold DESC;
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate  | false_positive_rate  | auc   |
+-----------+---------------------+----------------------+-------+
| 1.95      | 0.0                 | 0.0                  | 0.786 |
| 0.95      | 0.167               | 0.0                  | 0.786 |
| 0.87      | 0.333               | 0.0                  | 0.786 |
| 0.82      | 0.5                 | 0.0                  | 0.786 |
| 0.78      | 0.5                 | 0.125                | 0.786 |
| 0.71      | 0.667               | 0.125                | 0.786 |
| 0.65      | 0.667               | 0.25                 | 0.786 |
| 0.58      | 0.667               | 0.375                | 0.786 |
| 0.52      | 0.833               | 0.375                | 0.786 |
| 0.45      | 0.833               | 0.5                  | 0.786 |
| 0.38      | 0.833               | 0.625                | 0.786 |
| 0.31      | 1.0                 | 0.625                | 0.786 |
| 0.15      | 1.0                 | 0.75                 | 0.786 |
| 0.08      | 1.0                 | 0.875                | 0.786 |
| 0.03      | 1.0                 | 1.0                  | 0.786 |
+-----------+---------------------+----------------------+-------+

Exemplo: projeção de coluna dinâmica de um argumento de tabela

Este exemplo combina UDTFs polimórficos com argumentos de tabela. O UDTF aceita uma tabela e uma lista separada por vírgulas de nomes de coluna e projeta apenas essas colunas da entrada. O analyze método inspeciona o esquema da tabela de entrada e cria um esquema de saída que contém apenas as colunas solicitadas.

CREATE OR REPLACE FUNCTION project_columns(t TABLE, columns STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ProjectColumns'
AS $$
class ProjectColumns:
    @staticmethod
    def analyze(t, columns):
        from pyspark.sql.types import StructType
        from pyspark.sql.udtf import AnalyzeResult

        requested = [c.strip() for c in columns.value.split(",")]
        input_schema = t.dataType
        output_fields = []
        for field in input_schema.fields:
            if field.name in requested:
                output_fields.append(field)
        if not output_fields:
            raise ValueError(
                f"None of the requested columns {requested} "
                f"exist in the input table"
            )
        return AnalyzeResult(schema=StructType(output_fields))

    def eval(self, row, columns: str):
        requested = [c.strip() for c in columns.split(",")]
        yield tuple(row[col] for col in requested if col in row)
$$;

Use a função para selecionar colunas específicas de uma tabela:

SELECT * FROM project_columns(
  TABLE(SELECT * FROM samples.nyctaxi.trips LIMIT 5),
  'pickup_zip, dropoff_zip, fare_amount'
);

Limitações

As seguintes limitações técnicas se aplicam às UDTFs de Python no Unity Catalog.

Próximas etapas