Funkcje tabel zdefiniowane przez użytkownika (UDTF) w języku Python w Unity Catalog

Ważne

Rejestrowanie funkcji UDTF języka Python w Unity Catalog znajduje się w publicznej wersji zapoznawczej.

Funkcja tabeli zdefiniowana przez użytkownika (UDTF) w katalogu Unity rejestruje funkcje zwracające pełne tabele zamiast wartości skalarnych. W przeciwieństwie do funkcji skalarnych, które zwracają pojedynczą wartość wyniku z każdego wywołania, funkcje UDF są wywoływane w klauzuli FROM instrukcji SQL i mogą zwracać wiele wierszy i kolumn.

Funkcje tabel zdefiniowane przez użytkownika (UDTF) są szczególnie przydatne do:

  • Przekształcanie tablic lub złożonych struktur danych w wiele wierszy
  • Integrowanie zewnętrznych interfejsów API lub usług z przepływami pracy SQL
  • Implementowanie niestandardowej logiki generowania lub wzbogacania danych
  • Przetwarzanie danych wymagających operacji stanowych w wierszach

Każde wywołanie UDTF akceptuje zero lub więcej argumentów. Te argumenty mogą być wyrażeniami skalarnych lub argumentami tabeli reprezentującymi całe tabele wejściowe.

Funkcje zdefiniowane przez użytkownika można zarejestrować na dwa sposoby:

Requirements

Funkcje tabel zdefiniowane przez użytkownika w języku Python w katalogu Unity są obsługiwane na następujących typach środowisk obliczeniowych:

  • Notesy i zadania bezserwerowe
  • Klasyczne obliczenia ze standardowym trybem dostępu (Databricks Runtime 17.1 lub nowszy)
  • SQL Warehouse (bezserwerowe lub pro)

Utwórz funkcję UDTF w katalogu Unity

Użyj SQL DDL, aby utworzyć zarządzaną funkcję UDTF w Unity Catalog. Funkcje tabelaryczne zdefiniowane przez użytkownika (UDTF) są wywoływane przy użyciu klauzuli w instrukcji FROM SQL.

CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
    """
    Basic UDTF that computes a sequence of integers
    and includes the square of each number in the range.
    """
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num * num)
$$;

SELECT * FROM square_numbers(1, 5);

+-----+---------+
| num | squared |
+-----+---------+
| 1   | 1       |
| 2   | 4       |
| 3   | 9       |
| 4   | 16      |
| 5   | 25      |
+-----+---------+

Usługa Azure Databricks implementuje funkcje UDTF języka Python jako klasy języka Python z obowiązkową eval metodą, która generuje wiersze wyjściowe.

Argumenty tabeli

Uwaga / Notatka

TABLE Argumenty są obsługiwane w środowisku Databricks Runtime 17.2 lub nowszym.

Funkcje tabel zdefiniowane przez użytkownika (UDTF) mogą akceptować całe tabele jako argumenty wejściowe, umożliwiając złożone transformacje zachowujące stan i agregacje.

eval() i terminate() metody cyklu życia

Argumenty tabeli w funkcjach UDF używają następujących funkcji do przetwarzania każdego wiersza:

  • eval(): wywoływany raz dla każdego wiersza w tabeli wejściowej. Jest to główna metoda przetwarzania i jest wymagana.
  • terminate(): wywoływany raz na końcu każdej partycji, po przetworzeniu wszystkich wierszy przez eval()element. Użyj tej metody, aby uzyskać końcowe zagregowane wyniki lub wykonać operacje oczyszczania. Ta metoda jest opcjonalna, ale niezbędna dla operacji stanowych, takich jak agregacje, zliczanie lub przetwarzanie wsadowe.

Aby uzyskać więcej informacji na temat eval() metod i terminate() metod, zobacz dokumentację platformy Apache Spark: Python UDTF.

Wzorce dostępu do wierszy

eval() odbiera wiersze z TABLE argumentów jako obiekty pyspark.sql.Row . Możesz uzyskać dostęp do wartości według nazwy kolumny (row['id'], row['name']) lub indeksu (row[0], row[1]).

  • Elastyczność schematu: deklarowanie TABLE argumentów bez definicji schematu (na przykład data TABLE, t TABLE). Funkcja akceptuje dowolną strukturę tabeli, więc kod powinien sprawdzić, czy istnieją wymagane kolumny.

Zobacz Przykład: Dopasowywanie adresów IP do bloków sieciowych CIDR i Przykład: grupowe opisywanie obrazów za pomocą punktów końcowych przetwarzania obrazu usługi Azure Databricks.

Obliczanie dynamicznego schematu wyjściowego (wielomorficzne funkcje UDTF)

Uwaga / Notatka

Polymorficzne UC UDTFs wymagają środowiska Databricks Runtime 18.1 lub nowszego.

Wielomorficzny UDTF określa schemat danych wyjściowych dynamicznie w czasie zapytania przy użyciu metody statycznej analyze(), zamiast deklarować kolumny z góry. Aby go utworzyć, użyj RETURNS TABLE bez definicji kolumn i zdefiniuj metodę analyze() w klasie obsługi.

Poniższy przykład wyodrębnia pola określone przez obiekt wywołujący z ciągu JSON, zwracając różne kolumny w zależności od argumentu fields :

CREATE OR REPLACE FUNCTION extract_fields(json_str STRING, fields STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ExtractFields'
AS $$
class ExtractFields:
    @staticmethod
    def analyze(json_str, fields):

        # Build the output schema from the requested field names
        from pyspark.sql.types import StructType, StructField, StringType
        from pyspark.sql.udtf import AnalyzeResult
        col_names = [f.strip() for f in fields.value.split(",")]
        return AnalyzeResult(
            StructType([StructField(name, StringType()) for name in col_names])
        )

    def eval(self, json_str: str, fields: str):
        # Parse the JSON and yield only the requested fields
        import json
        data = json.loads(json_str)
        col_names = [f.strip() for f in fields.split(",")]
        yield tuple(data.get(name) for name in col_names)
$$;

-- Extract the name and city
SELECT * FROM extract_fields(
  '{"name": "Alice", "age": 30, "city": "Seattle"}',
  'name, city'
);
+-------+---------+
| name  | city    |
+-------+---------+
| Alice | Seattle |
+-------+---------+

Definiowanie analyze metody

Klasa obsługi musi zawierać metodę @staticmethod o nazwie analyze , która akceptuje te same argumenty co udTF i zwraca AnalyzeResult opisujący schemat wyjściowy. Azure Databricks wywołuje analyze() w czasie planowania zapytania w celu rozwiązania schematu przed wykonaniem funkcji.

Każdy parametr analyze klasy to wystąpienie AnalyzeArgument klasy:

Pole Description
dataType Typ argumentu wejściowego jako DataType. W przypadku argumentów tabeli wejściowej, StructType stanowi reprezentację kolumn tabeli.
value Wartość argumentu wejściowego jako Optional[Any]. None Dotyczy to argumentów tabeli lub wyrażeń niestałych.
isTable Czy argument wejściowy jest argumentem tabeli jako .BooleanType
isConstantExpression Czy argument wejściowy jest wyrażeniem składanym stałym jako BooleanType.

Metoda analyze zwraca wystąpienie AnalyzeResult klasy:

Pole Description
schema Schemat tabeli wyników jako StructType.
withSinglePartition Jeśli True jest użyty, wszystkie wiersze wejściowe są wysyłane do tego samego wystąpienia klasy UDTF.
partitionBy Jeśli nie jest puste, partycjonuje wiersze wejściowe według określonych wyrażeń, aby każda unikatowa kombinacja została przetworzona przez oddzielne wystąpienie UDTF.
orderBy Jeśli nie jest puste, określa kolejność wierszy w ramach każdej partycji.
select Jeśli wartość nie jest pusta, określa, które kolumny z argumentu wejściowego TABLE odbiera UDTF.

Ostrzeżenie

W przypadku polimorficznych wielokrotnych funkcji użytkownika (UDTF) katalogu Unity należy umieścić wszystkie importy wewnątrz treści analyze() metody. Importy najwyższego poziomu nie są dostępne w środowisku piaskownicy Unity Catalog.

Przesuń stan z analyze do eval

Metoda analyze jest uruchamiana raz w czasie planowania zapytań, dzięki czemu można jej użyć do wstępnego przetwarzania argumentów stałych, parsowania konfiguracji lub tworzenia wyszukiwań. Aby przekazać te wyniki do eval, utwórz podklasę @dataclassAnalyzeResult z polami niestandardowymi, zwróć ją z analyze, i zaakceptuj ją w metodzie __init__. To unika powtarzania kosztownych operacji dla każdego wiersza.

Poniższy przykład zamienia kod języka na pełną nazwę języka w analyze i przekazuje ją dalej, by eval mógł oznaczać każdy wiersz bez powtarzania odnośnika:

CREATE OR REPLACE FUNCTION tag_language(t TABLE, lang_code STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'TagLanguage'
AS $$
class TagLanguage:
    @staticmethod
    def analyze(t, lang_code):
        from dataclasses import dataclass
        from pyspark.sql.types import StructType, StructField, StringType
        from pyspark.sql.udtf import AnalyzeResult

        @dataclass
        class LangResult(AnalyzeResult):
            language: str = ""

        # Resolve the language code to a full name once during planning
        languages = {"en": "English", "es": "Spanish", "fr": "French", "de": "German"}
        return LangResult(
            schema=StructType([
                StructField("text", StringType()),
                StructField("language", StringType())
            ]),
            language=languages.get(lang_code.value, "Unknown")
        )

    def __init__(self, result):
        self._language = result.language

    def eval(self, row, lang_code: str):
        # Tag each row with the pre-resolved language name
        yield (row['text'], self._language)
$$;

SELECT * FROM tag_language(
  TABLE(VALUES ('Hola mundo'), ('Buenos días') t(text)),
  'es'
);
+-------------+----------+
| text        | language |
+-------------+----------+
| Hola mundo  | Spanish  |
| Buenos días | Spanish  |
+-------------+----------+

Aby uzyskać więcej wzorców i szczegółów dotyczących stanu przekazywania, zobacz Przekazywanie stanu do przyszłych eval wywołań.

Określanie partycjonowania z analyze metody

Gdy wielomorficzny UDTF akceptuje argument tabeli, metoda analyze może kontrolować sposób dystrybucji wierszy wejściowych w wystąpieniach UDTF przez ustawienie partitionBy, orderBy, withSinglePartition, i select na AnalyzeResult. Eliminuje to konieczność określania PARTITION BY lub ORDER BY w języku SQL przez osoby wywołujące.

Aby uzyskać pełny interfejs API partycjonowania i przykłady, zobacz jak określić partycjonowanie wierszy wejściowych przy użyciu metody analyze.

Izolacja środowiskowa

Uwaga / Notatka

Współdzielone środowiska izolacyjne wymagają Databricks Runtime 17.2 lub nowszego. We wcześniejszych wersjach wszystkie funkcje UDTF języka Python w Unity Catalog działają w trybie ścisłej izolacji.

Funkcje zdefiniowane przez użytkownika w języku Python w Unity Catalog, które mają tego samego właściciela i sesję, mogą domyślnie współdzielić środowisko izolacji. Zwiększa to wydajność i zmniejsza użycie pamięci przez zmniejszenie liczby oddzielnych środowisk, które należy uruchomić.

Ścisła izolacja

Aby zapewnić, że funkcja UDTF zawsze działa we własnym, w pełni izolowanym środowisku, dodaj klauzulę charakterystyczną STRICT ISOLATION .

Większość UDTF nie wymaga ścisłej izolacji. Standardowe funkcje tabelaryczne definiowane przez użytkownika (UDTFs) korzystają z domyślnego współdzielonego środowiska izolacyjnego i działają szybciej przy niższym zużyciu pamięci.

Dodaj klauzulę charakterystyczną STRICT ISOLATION do UDTF-ów, które:

  • Uruchom dane wejściowe jako kod przy użyciu funkcji eval(), exec() lub podobnych.
  • Zapisywanie plików w lokalnym systemie plików.
  • Modyfikowanie zmiennych globalnych lub stanu systemu.
  • Uzyskaj dostęp do zmiennych środowiskowych lub zmodyfikuj je.

Poniższy przykład UDTF ustawia niestandardową zmienną środowiskową, odczytuje tę zmienną i mnoży zestaw liczb przy jej użyciu. Ponieważ funkcja UDTF modyfikuje środowisko procesowe, uruchom ją w STRICT ISOLATION. W przeciwnym razie może to spowodować wyciek lub zastąpienie zmiennych środowiskowych dla innych funkcji zdefiniowanych przez użytkownika (UDF) lub tabel funkcji zdefiniowanych przez użytkownika (UDTF) w tym samym środowisku, co powoduje nieprawidłowe zachowanie.

CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os

class Multiplier:
    def eval(self, factor: str):
        # Save the factor as an environment variable
        os.environ["FACTOR"] = factor

        # Read it back and convert it to a number
        scale = int(os.getenv("FACTOR", "1"))

        # Multiply 0 through 4 by the factor
        for i in range(5):
            yield (i, i * scale)
$$;

SELECT * FROM multiply_numbers("3");

Ustaw DETERMINISTIC , czy funkcja generuje spójne wyniki

Dodaj DETERMINISTIC do definicji funkcji, jeśli generuje te same dane wyjściowe dla tych samych danych wejściowych. Dzięki temu optymalizacje zapytań mogą zwiększyć wydajność.

Domyślnie przyjmuje się, że funkcje UDTF języka Python w Unity Catalog usługi Batch są niedeterministyczne, chyba że zostanie to jawnie zadeklarowane. Przykłady funkcji niedeterministycznych to: generowanie wartości losowych, uzyskiwanie dostępu do bieżących godzin lub dat lub wykonywanie zewnętrznych wywołań interfejsu API.

Zobacz CREATE FUNCTION (SQL i Python).

Przykłady praktyczne

W poniższych przykładach przedstawiono rzeczywiste przypadki użycia dla funkcji UDTF języka Python w Unity Catalog, które rozwijają się od prostych przekształceń danych do złożonych integracji zewnętrznych.

Przykład: Ponowne implementowanie explode

Chociaż platforma Spark udostępnia wbudowaną explode funkcję, stworzenie własnej wersji demonstruje podstawowy wzorzec UDTF, polegający na przyjmowaniu pojedynczego wejścia i generowaniu wielu wierszy wyjściowych.

CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
    def eval(self, arr):
        if arr is None:
            return
        for element in arr:
            yield (element,)
$$;

Użyj funkcji bezpośrednio w zapytaniu SQL:

SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
| element |
+---------+
| apple   |
| banana  |
| cherry  |
+---------+

Możesz też zastosować je do istniejących danych tabeli za pomocą sprzężeniaLATERAL:

SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;

Przykład: geolokalizacja adresu IP za pośrednictwem interfejsu API REST

W tym przykładzie pokazano, jak UDTF mogą integrować zewnętrzne interfejsy API bezpośrednio z przepływem pracy SQL. Analitycy mogą wzbogacać dane przy użyciu wywołań interfejsu API w czasie rzeczywistym przy użyciu znanej składni SQL bez konieczności oddzielnych procesów ETL.

CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
    def eval(self, ip_address):
        import requests
        api_url = f"https://api.ip-lookup.example.com/{ip_address}"
        try:
            response = requests.get(api_url)
            response.raise_for_status()
            data = response.json()
            yield (data.get('city'), data.get('country'))
        except requests.exceptions.RequestException as e:
            # Return nothing if the API request fails
            return
$$;

Uwaga / Notatka

Funkcje zdefiniowane przez użytkownika w Pythonie (UDTF) pozwalają na ruch sieciowy TCP/UDP przez porty 80, 443 i 53 podczas korzystania z obliczeń bezserwerowych lub zasobów obliczeniowych skonfigurowanych w standardowym trybie dostępu.

Użyj funkcji , aby wzbogacić dane dziennika internetowego o informacje geograficzne:

SELECT
  l.timestamp,
  l.request_path,
  geo.city,
  geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;

Takie podejście umożliwia analizę geograficzną w czasie rzeczywistym bez konieczności użycia wstępnie przetworzonych tablic przeglądowych lub oddzielnych przepływów danych. Funkcja UDTF obsługuje żądania HTTP, analizowanie JSON i obsługę błędów, dzięki czemu zewnętrzne źródła danych są dostępne za pośrednictwem standardowych zapytań SQL.

Przykład: porównanie adresów IP z blokami sieci CIDR

W tym przykładzie pokazano dopasowywanie adresów IP do bloków sieciowych CIDR, co jest typowym zadaniem inżynierii danych, wymagającym złożonej logiki SQL.

Najpierw utwórz przykładowe dane z adresami IPv4 i IPv6:

-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
  ('log1', '192.168.1.100'),
  ('log2', '10.0.0.5'),
  ('log3', '172.16.0.10'),
  ('log4', '8.8.8.8'),
  ('log5', '2001:db8::1'),
  ('log6', '2001:db8:85a3::8a2e:370:7334'),
  ('log7', 'fe80::1'),
  ('log8', '::1'),
  ('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);

Następnie zdefiniuj i zarejestruj UDTF. Zwróć uwagę na strukturę klas języka Python:

  • Parametr t TABLE akceptuje tabelę wejściową z dowolnym schematem. Funkcja UDTF automatycznie dostosowuje się do przetwarzania wszystkich podanych kolumn. Ta elastyczność oznacza, że można użyć tej samej funkcji w różnych tabelach bez modyfikowania podpisu funkcji. Należy jednak dokładnie sprawdzić schemat wierszy, aby zapewnić zgodność.
  • Metoda __init__ jest używana do rozległej jednorazowej konfiguracji, takiej jak ładowanie dużej listy sieci. Praca ta odbywa się raz na każdą partycję tabeli wejściowej.
  • Metoda eval przetwarza każdy wiersz i zawiera podstawową logikę dopasowania. Ta metoda wykonywana jest dokładnie raz dla każdego wiersza w partycji wejściowej i jest realizowana przez stosowne wystąpienie klasy IpMatcher UDTF dla tej partycji.
  • Klauzula HANDLER określa nazwę klasy języka Python, która implementuje logikę UDTF.
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
    def __init__(self):
        import ipaddress
        # Heavy initialization - load networks once per partition
        self.nets = []
        cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
                 '2001:db8::/32', 'fe80::/10', '::1/128']
        for cidr in cidrs:
            self.nets.append(ipaddress.ip_network(cidr))

    def eval(self, row):
        import ipaddress
	    # Validate that required fields exist
        required_fields = ['log_id', 'ip_address']
        for field in required_fields:
            if field not in row:
                raise ValueError(f"Missing required field: {field}")
        try:
            ip = ipaddress.ip_address(row['ip_address'])
            for net in self.nets:
                if ip in net:
                    yield (row['log_id'], row['ip_address'], str(net), ip.version)
                    return
            yield (row['log_id'], row['ip_address'], None, ip.version)
        except ValueError:
            yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;

Teraz, gdy ip_cidr_matcher jest zarejestrowany w katalogu Unity, wywołaj go bezpośrednio z SQL przy użyciu składni TABLE().

-- Process all IP addresses
SELECT
  *
FROM
  ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
  log_id;
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address                    | network         | ip_version  |
+--------+-------------------------------+-----------------+-------------+
| log1   | 192.168.1.100                 | 192.168.0.0/16  | 4           |
| log2   | 10.0.0.5                      | 10.0.0.0/8      | 4           |
| log3   | 172.16.0.10                   | 172.16.0.0/12   | 4           |
| log4   | 8.8.8.8                       | null            | 4           |
| log5   | 2001:db8::1                   | 2001:db8::/32   | 6           |
| log6   | 2001:db8:85a3::8a2e:370:7334  | 2001:db8::/32   | 6           |
| log7   | fe80::1                       | fe80::/10       | 6           |
| log8   | ::1                           | ::1/128         | 6           |
| log9   | 2001:db8:1234:5678::1         | 2001:db8::/32   | 6           |
+--------+-------------------------------+-----------------+-------------+

Przykład: transkrybowanie obrazów w usłudze Batch przy użyciu punktów końcowych przetwarzania obrazów usługi Azure Databricks

W tym przykładzie pokazano generowanie podpisów dla obrazów wsadowych przy użyciu modelu wizji usługi Azure Databricks obsługiwanego przez punkt końcowy. Prezentuje użycie terminate() do przetwarzania wsadowego i wykonywania opartego na partycjach.

  1. Utwórz tabelę z publicznymi adresami URL obrazów:

    CREATE OR REPLACE TEMPORARY VIEW sample_images AS
    VALUES
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'),
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'),
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'),
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery')
    images(image_url, category);
    
  2. Utwórz Unity Catalog w Pythonie, używając UDTF, aby wygenerować opisy obrazów:

    1. Zainicjuj UDTF przy użyciu konfiguracji, w tym rozmiaru partii, tokenu API Azure Databricks, punktu końcowego modelu wizyjnego i adresu URL obszaru roboczego.
    2. W metodzie eval zbierz adresy URL obrazów do buforu. Gdy bufor osiągnie wielkość partii, uruchom przetwarzanie wsadowe. Dzięki temu wiele obrazów jest przetwarzanych razem w jednym wywołaniu interfejsu API, a nie w poszczególnych wywołaniach obrazu.
    3. W metodzie przetwarzania wsadowego pobierz wszystkie buforowane obrazy, zakoduj je jako base64 i wyślij je do pojedynczego żądania interfejsu API do usługi Databricks VisionModel. Model przetwarza wszystkie obrazy jednocześnie i zwraca podpisy dla całej partii.
    4. Metoda terminate jest wykonywana dokładnie raz na końcu każdej partycji. W metodzie terminate przetwarzaj wszystkie pozostałe obrazy w buforze i zwracaj wszystkie zebrane opisy jako wyniki.

Uwaga / Notatka

Zamień <workspace-url> na rzeczywisty adres URL swojego obszaru roboczego usługi Azure Databricks (https://your-workspace.cloud.databricks.com).

CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
    def __init__(self):
        self.batch_size = 3
        self.vision_endpoint = "databricks-claude-sonnet-4-5"
        self.workspace_url = "<workspace-url>"
        self.image_buffer = []
        self.results = []

    def eval(self, row, api_token):
        self.image_buffer.append((str(row[0]), api_token))
        if len(self.image_buffer) >= self.batch_size:
            self._process_batch()

    def terminate(self):
        if self.image_buffer:
            self._process_batch()
        for caption in self.results:
            yield (caption,)

    def _process_batch(self):
        batch_data = self.image_buffer.copy()
        self.image_buffer.clear()

        import base64
        import httpx
        import requests

        # API request timeout in seconds
        api_timeout = 60
        # Maximum tokens for vision model response
        max_response_tokens = 300
        # Temperature controls randomness (lower = more deterministic)
        model_temperature = 0.3

        # create a batch for the images
        batch_images = []
        api_token = batch_data[0][1] if batch_data else None

        for image_url, _ in batch_data:
            image_response = httpx.get(image_url, timeout=15)
            image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
            batch_images.append(image_data)

        content_items = [{
            "type": "text",
            "text": "Provide brief captions for these images, one per line."
        }]
        for img_data in batch_images:
            content_items.append({
                "type": "image_url",
                "image_url": {
                    "url": "data:image/jpeg;base64," + img_data
                }
            })

        payload = {
            "messages": [{
                "role": "user",
                "content": content_items
            }],
            "max_tokens": max_response_tokens,
            "temperature": model_temperature
        }

        response = requests.post(
            self.workspace_url + "/serving-endpoints/" +
            self.vision_endpoint + "/invocations",
            headers={
                'Authorization': 'Bearer ' + api_token,
                'Content-Type': 'application/json'
            },
            json=payload,
            timeout=api_timeout
        )

        result = response.json()
        batch_response = result['choices'][0]['message']['content'].strip()

        lines = batch_response.split('\n')
        captions = [line.strip() for line in lines if line.strip()]

        while len(captions) < len(batch_data):
            captions.append(batch_response)

        self.results.extend(captions[:len(batch_data)])
$$;

Aby użyć wsadowego podpisywania obrazów UDTF, wywołaj go przy użyciu przykładowej tabeli obrazów.

Uwaga / Notatka

Zastąp your_secret_scope i api_token rzeczywistym zakresem tajnym i nazwą klucza dla tokenu interfejsu API Databricks.

SELECT
  caption
FROM
  batch_inference_image_caption(
    data => TABLE(sample_images),
    api_token => secret('your_secret_scope', 'api_token')
  )
+---------------------------------------------------------------------------------------------------------------+
| caption                                                                                                       |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies                                     |
| Black ant in detailed macro photography standing on a textured surface                                        |
| Tabby cat lounging comfortably on a white ledge against a white wall                                          |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+

Możesz również wygenerować kategorię podpisów obrazów według kategorii:

SELECT
  *
FROM
  batch_inference_image_caption(
    TABLE(sample_images)
    PARTITION BY category ORDER BY (category),
    secret('your_secret_scope', 'api_token')
  )
+------------------------------------------------------------------------------------------------------+
| caption                                                                                              |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface                               |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space.  |
| Tabby cat lounging comfortably on white ledge against white wall                                     |
| Wooden boardwalk cutting through lush wetland grasses under blue skies                               |
+------------------------------------------------------------------------------------------------------+

Przykład: krzywa ROC i obliczenia AUC na potrzeby oceny modelu uczenia maszynowego

W tym przykładzie pokazano, jak obliczać krzywe ROC i pole pod krzywą (AUC) dla oceny modeli klasyfikacji binarnej przy użyciu biblioteki scikit-learn.

W tym przykładzie przedstawiono kilka ważnych wzorców:

  • Użycie biblioteki zewnętrznej: integruje bibliotekę scikit-learn z obliczeniami krzywej ROC
  • Agregacja stanowa: gromadzi przewidywania we wszystkich wierszach przed obliczeniami metryk
  • terminate() użycie metody: przetwarza kompletny zestaw danych i daje wyniki dopiero po ocenie wszystkich wierszy
  • Obsługa błędów: Sprawdza, czy wymagane kolumny istnieją w tabeli wejściowej

Funkcja UDTF gromadzi wszystkie przewidywania w pamięci za pomocą metody eval(), a następnie oblicza i zwraca pełną krzywą ROC przy użyciu metody terminate(). Ten wzorzec jest przydatny w przypadku metryk, które wymagają kompletnego zestawu danych do obliczenia.

CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
    def __init__(self):
        from sklearn import metrics
        self._roc_curve = metrics.roc_curve
        self._roc_auc_score = metrics.roc_auc_score

        self._true_labels = []
        self._predicted_scores = []

    def eval(self, row):
        if 'y_true' not in row or 'y_score' not in row:
            raise KeyError("Required columns 'y_true' and 'y_score' not found")

        true_label = row['y_true']
        predicted_score = row['y_score']

        label = float(true_label)
        self._true_labels.append(label)
        self._predicted_scores.append(float(predicted_score))

    def terminate(self):
        false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
            self._true_labels,
            self._predicted_scores,
            drop_intermediate=False
        )

        auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))

        for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
            yield float(threshold), float(tpr), float(fpr), auc_score
$$;

Utwórz przykładowe dane klasyfikacji binarnej z przewidywaniami:

CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
  ( 1, 1.0, 0.95, 'high_confidence_positive'),
  ( 2, 1.0, 0.87, 'high_confidence_positive'),
  ( 3, 1.0, 0.82, 'medium_confidence_positive'),
  ( 4, 0.0, 0.78, 'false_positive'),
  ( 5, 1.0, 0.71, 'medium_confidence_positive'),
  ( 6, 0.0, 0.65, 'false_positive'),
  ( 7, 0.0, 0.58, 'true_negative'),
  ( 8, 1.0, 0.52, 'low_confidence_positive'),
  ( 9, 0.0, 0.45, 'true_negative'),
  (10, 0.0, 0.38, 'true_negative'),
  (11, 1.0, 0.31, 'low_confidence_positive'),
  (12, 0.0, 0.15, 'true_negative'),
  (13, 0.0, 0.08, 'high_confidence_negative'),
  (14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);

Oblicz krzywą ROC i AUC:

SELECT
    threshold,
    true_positive_rate,
    false_positive_rate,
    auc
FROM compute_roc_curve(
  TABLE(
    SELECT y_true, y_score
    FROM binary_classification_data
    WHERE y_true IS NOT NULL AND y_score IS NOT NULL
    ORDER BY sample_id
  )
)
ORDER BY threshold DESC;
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate  | false_positive_rate  | auc   |
+-----------+---------------------+----------------------+-------+
| 1.95      | 0.0                 | 0.0                  | 0.786 |
| 0.95      | 0.167               | 0.0                  | 0.786 |
| 0.87      | 0.333               | 0.0                  | 0.786 |
| 0.82      | 0.5                 | 0.0                  | 0.786 |
| 0.78      | 0.5                 | 0.125                | 0.786 |
| 0.71      | 0.667               | 0.125                | 0.786 |
| 0.65      | 0.667               | 0.25                 | 0.786 |
| 0.58      | 0.667               | 0.375                | 0.786 |
| 0.52      | 0.833               | 0.375                | 0.786 |
| 0.45      | 0.833               | 0.5                  | 0.786 |
| 0.38      | 0.833               | 0.625                | 0.786 |
| 0.31      | 1.0                 | 0.625                | 0.786 |
| 0.15      | 1.0                 | 0.75                 | 0.786 |
| 0.08      | 1.0                 | 0.875                | 0.786 |
| 0.03      | 1.0                 | 1.0                  | 0.786 |
+-----------+---------------------+----------------------+-------+

Przykład: dynamiczne projekcje kolumn z argumentu tabeli

Ten przykład łączy wielomorficzne funkcje UDF z argumentami tabeli. Funkcja UDTF akceptuje tabelę i rozdzielaną przecinkami listę nazw kolumn, a następnie projektuje tylko te kolumny z danych wejściowych. Metoda analyze sprawdza schemat tabeli wejściowej i tworzy schemat wyjściowy zawierający tylko żądane kolumny.

CREATE OR REPLACE FUNCTION project_columns(t TABLE, columns STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ProjectColumns'
AS $$
class ProjectColumns:
    @staticmethod
    def analyze(t, columns):
        from pyspark.sql.types import StructType
        from pyspark.sql.udtf import AnalyzeResult

        requested = [c.strip() for c in columns.value.split(",")]
        input_schema = t.dataType
        output_fields = []
        for field in input_schema.fields:
            if field.name in requested:
                output_fields.append(field)
        if not output_fields:
            raise ValueError(
                f"None of the requested columns {requested} "
                f"exist in the input table"
            )
        return AnalyzeResult(schema=StructType(output_fields))

    def eval(self, row, columns: str):
        requested = [c.strip() for c in columns.split(",")]
        yield tuple(row[col] for col in requested if col in row)
$$;

Użyj funkcji , aby wybrać określone kolumny z tabeli:

SELECT * FROM project_columns(
  TABLE(SELECT * FROM samples.nyctaxi.trips LIMIT 5),
  'pickup_zip, dropoff_zip, fare_amount'
);

Ograniczenia

Następujące ograniczenia dotyczą funkcji UDTF Python w Unity Catalog:

Dalsze kroki