Fungsi tabel yang ditentukan pengguna (UDTF) Python di Unity Catalog

Penting

Mendaftarkan UDTF Python di Unity Catalog sedang dalam Pratinjau Umum.

Fungsi tabel yang ditentukan oleh pengguna dalam Unity Catalog (UDTF) adalah fungsi yang mendaftarkan tabel lengkap sebagai hasil, bukan nilai skalar. Tidak seperti fungsi skalar yang mengembalikan nilai hasil tunggal dari setiap panggilan, UDTF dipanggil dalam klausa pernyataan FROM SQL dan dapat mengembalikan beberapa baris dan kolom.

UDTF sangat berguna untuk:

  • Mengubah array atau struktur data kompleks menjadi beberapa baris
  • Mengintegrasikan API atau layanan eksternal ke dalam alur kerja SQL
  • Menerapkan pembuatan data kustom atau logika pengayaan
  • Memproses data yang memerlukan operasi stateful di seluruh baris

Setiap panggilan UDTF menerima argumen nol atau lebih. Argumen ini dapat berupa ekspresi skalar atau argumen tabel yang mewakili seluruh tabel input.

UDTF dapat didaftarkan dengan dua cara:

Persyaratan

Unity Catalog Python UDTFs didukung pada jenis komputasi berikut:

  • Buku catatan dan pekerjaan tanpa server
  • Komputasi klasik dengan mode akses standar (Databricks Runtime 17.1 ke atas)
  • Gudang SQL (tanpa server atau pro)

Membuat UDTF di Unity Catalog

Gunakan SQL DDL untuk membuat UDTF yang diatur di Unity Catalog. UDTF dipanggil menggunakan klausa pernyataan FROM SQL.

CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
    """
    Basic UDTF that computes a sequence of integers
    and includes the square of each number in the range.
    """
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num * num)
$$;

SELECT * FROM square_numbers(1, 5);

+-----+---------+
| num | squared |
+-----+---------+
| 1   | 1       |
| 2   | 4       |
| 3   | 9       |
| 4   | 16      |
| 5   | 25      |
+-----+---------+

Azure Databricks mengimplementasikan UDTF Python sebagai kelas Python dengan metode wajib eval yang menghasilkan baris output.

Argumen tabel

Nota

TABLE argumen didukung dalam Databricks Runtime 17.2 ke atas.

UDTF dapat menerima seluruh tabel sebagai argumen input, sehingga memungkinkan dilakukannya transformasi dan agregasi berbasis status yang kompleks.

eval() dan terminate() metode siklus hidup

Argumen tabel dalam UDTF menggunakan fungsi berikut untuk memproses setiap baris:

  • eval(): Dipanggil sekali untuk setiap baris dalam tabel input. Ini adalah metode pemrosesan utama dan diperlukan.
  • terminate(): Dipanggil sekali di akhir setiap partisi, setelah semua baris diproses oleh eval(). Gunakan metode ini untuk menghasilkan hasil agregat akhir atau melakukan operasi pembersihan. Metode ini bersifat opsional tetapi penting untuk operasi stateful seperti agregasi, penghitungan, atau pemrosesan batch.

Untuk informasi selengkapnya tentang eval() metode dan terminate() , lihat dokumentasi Apache Spark: Python UDTF.

Pola akses baris

eval() menerima baris dari argumen TABLE dalam bentuk objek pyspark.sql.Row. Anda dapat mengakses nilai menurut nama kolom (row['id'], row['name']) atau menurut indeks (row[0], row[1]).

  • Fleksibilitas skema: Menyatakan TABLE argumen tanpa definisi skema (misalnya, data TABLE, t TABLE). Fungsi menerima struktur tabel apa pun, sehingga kode Anda harus memvalidasi bahwa kolom yang diperlukan ada.

Lihat Contoh: Mencocokkan alamat IP terhadap blok jaringan CIDR dan Contoh: Penjelasan gambar secara batch menggunakan Endpoint Visi Azure Databricks.

Menghitung skema output dinamis (UDTF polimorfik)

Nota

UC UDTF Polimorfik memerlukan Databricks Runtime 18.1 atau lebih.

UDTF yang bersifat polimorfik menentukan skema outputnya secara dinamis saat kueri menggunakan metode statis analyze(), daripada mendeklarasikan kolom sebelumnya. Untuk membuatnya, gunakan RETURNS TABLE tanpa definisi kolom dan tentukan analyze() metode pada kelas handler.

Contoh berikut mengekstrak bidang yang ditentukan pemanggil dari string JSON, mengembalikan kolom yang berbeda tergantung pada fields argumen:

CREATE OR REPLACE FUNCTION extract_fields(json_str STRING, fields STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ExtractFields'
AS $$
class ExtractFields:
    @staticmethod
    def analyze(json_str, fields):

        # Build the output schema from the requested field names
        from pyspark.sql.types import StructType, StructField, StringType
        from pyspark.sql.udtf import AnalyzeResult
        col_names = [f.strip() for f in fields.value.split(",")]
        return AnalyzeResult(
            StructType([StructField(name, StringType()) for name in col_names])
        )

    def eval(self, json_str: str, fields: str):
        # Parse the JSON and yield only the requested fields
        import json
        data = json.loads(json_str)
        col_names = [f.strip() for f in fields.split(",")]
        yield tuple(data.get(name) for name in col_names)
$$;

-- Extract the name and city
SELECT * FROM extract_fields(
  '{"name": "Alice", "age": 30, "city": "Seattle"}',
  'name, city'
);
+-------+---------+
| name  | city    |
+-------+---------+
| Alice | Seattle |
+-------+---------+

Tentukan analyze metode

Kelas handler harus menyertakan method bernama analyze yang menerima argumen yang sama dengan UDTF dan mengembalikan AnalyzeResult yang menjelaskan skema keluaran. Azure Databricks memanggil analyze() pada waktu perencanaan kueri untuk menyelesaikan skema sebelum menjalankan fungsi.

Setiap parameter analyze adalah instance kelas AnalyzeArgument:

Ladang Description
dataType Jenis argumen input adalah DataType. Untuk argumen tabel input, ini adalah StructType yang mewakili kolom tabel.
value Nilai dari argumen input adalah Optional[Any]. Ini None untuk argumen tabel atau ekspresi non-konstanta.
isTable Apakah argumen input adalah argumen tabel dengan BooleanType.
isConstantExpression Apakah argumen input adalah ekspresi yang dapat dilipat secara konstan sebagai BooleanType.

Metode analyze mengembalikan sebuah instans dari kelas AnalyzeResult.

Ladang Description
schema Skema tabel hasil adalah StructType.
withSinglePartition Jika True, mengirimkan semua baris input ke instans kelas UDTF yang sama.
partitionBy Jika tidak kosong, baris-baris input dipartisi oleh ekspresi yang ditentukan sehingga setiap kombinasi unik diproses oleh instans UDTF yang terpisah.
orderBy Jika tidak kosong, menentukan urutan baris dalam setiap partisi.
select Jika tidak kosong, tentukan kolom mana dari argumen input TABLE yang diterima UDTF.

Peringatan

Untuk Unity Catalog polymorphic UDTFs, Anda harus menempatkan semua pernyataan impor di dalam badan metode analyze(). Impor tingkat tinggi tidak tersedia di lingkungan sandbox Katalog Unity.

Teruskan status dari analyze ke eval

Metode ini analyze berjalan sekali pada waktu perencanaan kueri, sehingga Anda dapat menggunakannya untuk memproses argumen yang bersifat konstan, mengurai konfigurasi, atau membangun tabel pencarian. Untuk meneruskan hasil tersebut ke eval, buat subkelas @dataclass dari AnalyzeResult dengan bidang kustom, kembalikan dari analyze, dan menerimanya dalam metode __init__. Ini menghindari pengulangan pekerjaan yang mahal untuk setiap baris.

Contoh berikut menyelesaikan kode bahasa ke nama bahasa lengkap sekali masuk analyze dan meneruskannya, sehingga eval dapat menandai setiap baris tanpa mengulangi pencarian:

CREATE OR REPLACE FUNCTION tag_language(t TABLE, lang_code STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'TagLanguage'
AS $$
class TagLanguage:
    @staticmethod
    def analyze(t, lang_code):
        from dataclasses import dataclass
        from pyspark.sql.types import StructType, StructField, StringType
        from pyspark.sql.udtf import AnalyzeResult

        @dataclass
        class LangResult(AnalyzeResult):
            language: str = ""

        # Resolve the language code to a full name once during planning
        languages = {"en": "English", "es": "Spanish", "fr": "French", "de": "German"}
        return LangResult(
            schema=StructType([
                StructField("text", StringType()),
                StructField("language", StringType())
            ]),
            language=languages.get(lang_code.value, "Unknown")
        )

    def __init__(self, result):
        self._language = result.language

    def eval(self, row, lang_code: str):
        # Tag each row with the pre-resolved language name
        yield (row['text'], self._language)
$$;

SELECT * FROM tag_language(
  TABLE(VALUES ('Hola mundo'), ('Buenos días') t(text)),
  'es'
);
+-------------+----------+
| text        | language |
+-------------+----------+
| Hola mundo  | Spanish  |
| Buenos días | Spanish  |
+-------------+----------+

Untuk pola dan detail selengkapnya tentang status penerusan, lihat Mengalihkan status ke panggilan mendatangeval.

Tentukan partisi dari metode analyze

Ketika UDTF polimorfik menerima argumen tabel, analyze metode dapat mengontrol bagaimana baris input didistribusikan di seluruh instans UDTF dengan mengatur partitionBy, orderBy, withSinglePartition, dan select pada AnalyzeResult. Ini menghilangkan kebutuhan penelepon untuk menentukan PARTITION BY atau ORDER BY di SQL.

Untuk API dan contoh partisi lengkap, lihat Menentukan partisi baris input dari analyze metode .

Isolasi lingkungan

Nota

Lingkungan isolasi bersama memerlukan Databricks Runtime 17.2 ke atas. Dalam versi sebelumnya, semua Unity Catalog Python UDTF berjalan dalam mode isolasi yang ketat.

Unity Catalog Python UDTFs dengan pemilik dan sesi yang sama dapat berbagi lingkungan isolasi secara default. Ini meningkatkan performa dan mengurangi penggunaan memori dengan mengurangi jumlah lingkungan terpisah yang perlu diluncurkan.

Isolasi ketat

Untuk memastikan UDTF selalu berjalan di lingkungannya sendiri yang sepenuhnya terisolasi, tambahkan STRICT ISOLATION klausa karakteristik.

Sebagian besar UDTF tidak memerlukan isolasi yang ketat. UDTF pemrosesan data standar mendapat manfaat dari lingkungan isolasi bersama default dan berjalan lebih cepat dengan konsumsi memori yang lebih rendah.

STRICT ISOLATION Tambahkan klausa karakteristik ke UDTF yang:

  • Jalankan input sebagai kode menggunakan eval(), exec(), atau fungsi serupa.
  • Tulis file ke sistem file lokal.
  • Ubah variabel global atau status sistem.
  • Mengakses atau mengubah variabel lingkungan.

Contoh UDTF berikut mengatur variabel lingkungan kustom, membaca variabel kembali, dan mengalikan sekumpulan angka menggunakan variabel . Karena UDTF memutasi lingkungan proses, jalankan di STRICT ISOLATION. Jika tidak, itu dapat membocorkan atau mengambil alih variabel lingkungan untuk UDF/UDTF lain di lingkungan yang sama, menyebabkan perilaku yang salah.

CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os

class Multiplier:
    def eval(self, factor: str):
        # Save the factor as an environment variable
        os.environ["FACTOR"] = factor

        # Read it back and convert it to a number
        scale = int(os.getenv("FACTOR", "1"))

        # Multiply 0 through 4 by the factor
        for i in range(5):
            yield (i, i * scale)
$$;

SELECT * FROM multiply_numbers("3");

Atur DETERMINISTIC jika fungsi Anda menghasilkan hasil yang konsisten

Tambahkan DETERMINISTIC ke definisi fungsi Anda jika menghasilkan output yang sama untuk input yang sama. Ini memungkinkan pengoptimalan kueri untuk meningkatkan performa.

Secara default, UDTF Batch Unity Catalog Python diasumsikan tidak deterministik kecuali dinyatakan secara eksplisit. Contoh fungsi non-deterministik meliputi: menghasilkan nilai acak, mengakses waktu atau tanggal saat ini, atau melakukan panggilan API eksternal.

Lihat CREATE FUNCTION (SQL dan Python).

Contoh praktis

Contoh berikut menunjukkan kasus penggunaan dunia nyata untuk Unity Catalog Python UDTF, berkembang dari transformasi data sederhana ke integrasi eksternal yang kompleks.

Contoh: Mengimplementasikan ulang explode

Meskipun Spark menyediakan fungsi bawaan explode, membuat versi sendiri menunjukkan pola UDTF dasar dengan mengambil satu input dan menghasilkan beberapa baris output.

CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
    def eval(self, arr):
        if arr is None:
            return
        for element in arr:
            yield (element,)
$$;

Gunakan fungsi secara langsung dalam kueri SQL:

SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
| element |
+---------+
| apple   |
| banana  |
| cherry  |
+---------+

Atau terapkan ke data tabel yang sudah ada dengan LATERAL penggabungan:

SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;

Contoh: Geolokasi alamat IP melalui REST API

Contoh ini menunjukkan bagaimana UDTF dapat mengintegrasikan API eksternal langsung ke alur kerja SQL Anda. Analis dapat memperkaya data dengan panggilan API real-time menggunakan sintaks SQL yang familier, tanpa memerlukan proses ETL terpisah.

CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
    def eval(self, ip_address):
        import requests
        api_url = f"https://api.ip-lookup.example.com/{ip_address}"
        try:
            response = requests.get(api_url)
            response.raise_for_status()
            data = response.json()
            yield (data.get('city'), data.get('country'))
        except requests.exceptions.RequestException as e:
            # Return nothing if the API request fails
            return
$$;

Nota

UDTF Python memungkinkan traffic jaringan TCP/UDP melalui port 80, 443, dan 53 saat menggunakan komputasi tanpa server atau komputasi yang dikonfigurasi dengan mode akses standar.

Gunakan fungsi untuk memperkaya data log web dengan informasi geografis:

SELECT
  l.timestamp,
  l.request_path,
  geo.city,
  geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;

Pendekatan ini memungkinkan analisis geografis real-time tanpa memerlukan tabel pencarian yang telah diproses sebelumnya atau alur data terpisah. UDTF menangani permintaan HTTP, penguraian JSON, dan penanganan kesalahan, membuat sumber data eksternal dapat diakses melalui kueri SQL standar.

Contoh: Mencocokkan alamat IP terhadap blok jaringan CIDR

Contoh ini menunjukkan alamat IP yang cocok terhadap blok jaringan CIDR, tugas rekayasa data umum yang memerlukan logika SQL kompleks.

Pertama, buat data sampel dengan alamat IPv4 dan IPv6:

-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
  ('log1', '192.168.1.100'),
  ('log2', '10.0.0.5'),
  ('log3', '172.16.0.10'),
  ('log4', '8.8.8.8'),
  ('log5', '2001:db8::1'),
  ('log6', '2001:db8:85a3::8a2e:370:7334'),
  ('log7', 'fe80::1'),
  ('log8', '::1'),
  ('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);

Selanjutnya, tentukan dan daftarkan UDTF. Perhatikan struktur kelas Python:

  • Parameter t TABLE menerima tabel input dengan skema apa pun. UDTF secara otomatis beradaptasi untuk memproses kolom apa pun yang disediakan. Fleksibilitas ini berarti Anda dapat menggunakan fungsi yang sama di berbagai tabel tanpa memodifikasi tanda tangan fungsi. Namun, Anda harus memeriksa skema baris dengan hati-hati untuk memastikan kompatibilitas.
  • Metode __init__ ini digunakan untuk pengaturan awal yang berat dan dilakukan satu kali, seperti memuat daftar jaringan yang besar. Pekerjaan ini berlangsung sekali per partisi tabel input.
  • Metode eval memproses setiap baris dan berisi logika pencocokan inti. Metode ini dijalankan tepat sekali untuk setiap baris dalam partisi input, dan setiap eksekusi dilakukan oleh instans yang sesuai dari kelas UDTF untuk partisi tersebut IpMatcher .
  • Klausa HANDLER menentukan nama kelas Python yang mengimplementasikan logika UDTF.
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
    def __init__(self):
        import ipaddress
        # Heavy initialization - load networks once per partition
        self.nets = []
        cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
                 '2001:db8::/32', 'fe80::/10', '::1/128']
        for cidr in cidrs:
            self.nets.append(ipaddress.ip_network(cidr))

    def eval(self, row):
        import ipaddress
	    # Validate that required fields exist
        required_fields = ['log_id', 'ip_address']
        for field in required_fields:
            if field not in row:
                raise ValueError(f"Missing required field: {field}")
        try:
            ip = ipaddress.ip_address(row['ip_address'])
            for net in self.nets:
                if ip in net:
                    yield (row['log_id'], row['ip_address'], str(net), ip.version)
                    return
            yield (row['log_id'], row['ip_address'], None, ip.version)
        except ValueError:
            yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;

Sekarang ip_cidr_matcher telah terdaftar di Unity Catalog, panggil langsung menggunakan sintaksis TABLE() dari SQL.

-- Process all IP addresses
SELECT
  *
FROM
  ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
  log_id;
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address                    | network         | ip_version  |
+--------+-------------------------------+-----------------+-------------+
| log1   | 192.168.1.100                 | 192.168.0.0/16  | 4           |
| log2   | 10.0.0.5                      | 10.0.0.0/8      | 4           |
| log3   | 172.16.0.10                   | 172.16.0.0/12   | 4           |
| log4   | 8.8.8.8                       | null            | 4           |
| log5   | 2001:db8::1                   | 2001:db8::/32   | 6           |
| log6   | 2001:db8:85a3::8a2e:370:7334  | 2001:db8::/32   | 6           |
| log7   | fe80::1                       | fe80::/10       | 6           |
| log8   | ::1                           | ::1/128         | 6           |
| log9   | 2001:db8:1234:5678::1         | 2001:db8::/32   | 6           |
+--------+-------------------------------+-----------------+-------------+

Contoh: Keterangan gambar secara batch menggunakan endpoint penglihatan Azure Databricks

Contoh ini menunjukkan keterangan gambar batch menggunakan model visi Azure Databricks yang melayani titik akhir. Ini menampilkan penggunaan terminate() untuk pemrosesan batch dan eksekusi berbasis partisi.

  1. Buat tabel dengan URL gambar publik:

    CREATE OR REPLACE TEMPORARY VIEW sample_images AS
    VALUES
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'),
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'),
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'),
        ('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery')
    images(image_url, category);
    
  2. Buat Unity Catalog Python UDTF untuk menghasilkan keterangan gambar:

    1. Inisialisasi UDTF dengan konfigurasi, termasuk ukuran batch, token API Azure Databricks, titik akhir model visi, dan URL ruang kerja.
    2. Dalam metode ini eval , kumpulkan URL gambar ke dalam buffer. Ketika buffer mencapai ukuran batch, picu pemrosesan batch. Ini memastikan bahwa beberapa gambar diproses bersama dalam satu panggilan API daripada panggilan individual per gambar.
    3. Dalam metode pemrosesan batch, unduh semua gambar yang di-buffer, kodekan sebagai base64, dan kirimkan ke satu permintaan API ke Databricks VisionModel. Model memproses semua gambar secara bersamaan dan mengembalikan keterangan untuk seluruh batch.
    4. Metode terminate ini dijalankan tepat sekali di akhir setiap partisi. Dalam metode penghentian, proses gambar yang tersisa di buffer dan hasilkan semua keterangan yang dikumpulkan sebagai hasilnya.

Nota

Ganti <workspace-url> dengan URL ruang kerja Azure Databricks Anda yang sebenarnya (https://your-workspace.cloud.databricks.com).

CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
    def __init__(self):
        self.batch_size = 3
        self.vision_endpoint = "databricks-claude-sonnet-4-5"
        self.workspace_url = "<workspace-url>"
        self.image_buffer = []
        self.results = []

    def eval(self, row, api_token):
        self.image_buffer.append((str(row[0]), api_token))
        if len(self.image_buffer) >= self.batch_size:
            self._process_batch()

    def terminate(self):
        if self.image_buffer:
            self._process_batch()
        for caption in self.results:
            yield (caption,)

    def _process_batch(self):
        batch_data = self.image_buffer.copy()
        self.image_buffer.clear()

        import base64
        import httpx
        import requests

        # API request timeout in seconds
        api_timeout = 60
        # Maximum tokens for vision model response
        max_response_tokens = 300
        # Temperature controls randomness (lower = more deterministic)
        model_temperature = 0.3

        # create a batch for the images
        batch_images = []
        api_token = batch_data[0][1] if batch_data else None

        for image_url, _ in batch_data:
            image_response = httpx.get(image_url, timeout=15)
            image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
            batch_images.append(image_data)

        content_items = [{
            "type": "text",
            "text": "Provide brief captions for these images, one per line."
        }]
        for img_data in batch_images:
            content_items.append({
                "type": "image_url",
                "image_url": {
                    "url": "data:image/jpeg;base64," + img_data
                }
            })

        payload = {
            "messages": [{
                "role": "user",
                "content": content_items
            }],
            "max_tokens": max_response_tokens,
            "temperature": model_temperature
        }

        response = requests.post(
            self.workspace_url + "/serving-endpoints/" +
            self.vision_endpoint + "/invocations",
            headers={
                'Authorization': 'Bearer ' + api_token,
                'Content-Type': 'application/json'
            },
            json=payload,
            timeout=api_timeout
        )

        result = response.json()
        batch_response = result['choices'][0]['message']['content'].strip()

        lines = batch_response.split('\n')
        captions = [line.strip() for line in lines if line.strip()]

        while len(captions) < len(batch_data):
            captions.append(batch_response)

        self.results.extend(captions[:len(batch_data)])
$$;

Untuk menggunakan UDTF keterangan gambar batch, panggil menggunakan tabel gambar sampel:

Nota

Ganti your_secret_scope dan api_token dengan cakupan rahasia aktual dan nama kunci untuk token Databricks API.

SELECT
  caption
FROM
  batch_inference_image_caption(
    data => TABLE(sample_images),
    api_token => secret('your_secret_scope', 'api_token')
  )
+---------------------------------------------------------------------------------------------------------------+
| caption                                                                                                       |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies                                     |
| Black ant in detailed macro photography standing on a textured surface                                        |
| Tabby cat lounging comfortably on a white ledge against a white wall                                          |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+

Anda juga dapat menghasilkan kategori keterangan gambar menurut kategori:

SELECT
  *
FROM
  batch_inference_image_caption(
    TABLE(sample_images)
    PARTITION BY category ORDER BY (category),
    secret('your_secret_scope', 'api_token')
  )
+------------------------------------------------------------------------------------------------------+
| caption                                                                                              |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface                               |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space.  |
| Tabby cat lounging comfortably on white ledge against white wall                                     |
| Wooden boardwalk cutting through lush wetland grasses under blue skies                               |
+------------------------------------------------------------------------------------------------------+

Contoh: Kurva ROC dan komputasi AUC untuk evaluasi model ML

Contoh ini menunjukkan cara menghitung kurva karakteristik operasi penerima (ROC) dan skor area di bawah kurva (AUC) untuk evaluasi model klasifikasi biner menggunakan scikit-learn.

Contoh ini menampilkan beberapa pola penting:

  • Penggunaan pustaka eksternal: Mengintegrasikan scikit-learn untuk komputasi kurva ROC
  • Agregasi stateful: Mengumpulkan prediksi di semua baris sebelum menghitung metrik
  • terminate() penggunaan metode: Memproses himpunan data lengkap dan menghasilkan hasil hanya setelah semua baris dievaluasi
  • Penanganan kesalahan: Memvalidasi kolom yang diperlukan ada dalam tabel input

UDTF mengakumulasi semua prediksi dalam memori menggunakan eval() metode , lalu menghitung dan menghasilkan kurva ROC lengkap dalam terminate() metode . Pola ini berguna untuk metrik yang memerlukan himpunan data lengkap untuk perhitungan.

CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
    def __init__(self):
        from sklearn import metrics
        self._roc_curve = metrics.roc_curve
        self._roc_auc_score = metrics.roc_auc_score

        self._true_labels = []
        self._predicted_scores = []

    def eval(self, row):
        if 'y_true' not in row or 'y_score' not in row:
            raise KeyError("Required columns 'y_true' and 'y_score' not found")

        true_label = row['y_true']
        predicted_score = row['y_score']

        label = float(true_label)
        self._true_labels.append(label)
        self._predicted_scores.append(float(predicted_score))

    def terminate(self):
        false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
            self._true_labels,
            self._predicted_scores,
            drop_intermediate=False
        )

        auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))

        for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
            yield float(threshold), float(tpr), float(fpr), auc_score
$$;

Buat contoh data klasifikasi biner dengan prediksi:

CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
  ( 1, 1.0, 0.95, 'high_confidence_positive'),
  ( 2, 1.0, 0.87, 'high_confidence_positive'),
  ( 3, 1.0, 0.82, 'medium_confidence_positive'),
  ( 4, 0.0, 0.78, 'false_positive'),
  ( 5, 1.0, 0.71, 'medium_confidence_positive'),
  ( 6, 0.0, 0.65, 'false_positive'),
  ( 7, 0.0, 0.58, 'true_negative'),
  ( 8, 1.0, 0.52, 'low_confidence_positive'),
  ( 9, 0.0, 0.45, 'true_negative'),
  (10, 0.0, 0.38, 'true_negative'),
  (11, 1.0, 0.31, 'low_confidence_positive'),
  (12, 0.0, 0.15, 'true_negative'),
  (13, 0.0, 0.08, 'high_confidence_negative'),
  (14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);

Komputasi kurva ROC dan AUC:

SELECT
    threshold,
    true_positive_rate,
    false_positive_rate,
    auc
FROM compute_roc_curve(
  TABLE(
    SELECT y_true, y_score
    FROM binary_classification_data
    WHERE y_true IS NOT NULL AND y_score IS NOT NULL
    ORDER BY sample_id
  )
)
ORDER BY threshold DESC;
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate  | false_positive_rate  | auc   |
+-----------+---------------------+----------------------+-------+
| 1.95      | 0.0                 | 0.0                  | 0.786 |
| 0.95      | 0.167               | 0.0                  | 0.786 |
| 0.87      | 0.333               | 0.0                  | 0.786 |
| 0.82      | 0.5                 | 0.0                  | 0.786 |
| 0.78      | 0.5                 | 0.125                | 0.786 |
| 0.71      | 0.667               | 0.125                | 0.786 |
| 0.65      | 0.667               | 0.25                 | 0.786 |
| 0.58      | 0.667               | 0.375                | 0.786 |
| 0.52      | 0.833               | 0.375                | 0.786 |
| 0.45      | 0.833               | 0.5                  | 0.786 |
| 0.38      | 0.833               | 0.625                | 0.786 |
| 0.31      | 1.0                 | 0.625                | 0.786 |
| 0.15      | 1.0                 | 0.75                 | 0.786 |
| 0.08      | 1.0                 | 0.875                | 0.786 |
| 0.03      | 1.0                 | 1.0                  | 0.786 |
+-----------+---------------------+----------------------+-------+

Contoh: Proyeksi kolom dinamis dari argumen tabel

Contoh ini menggabungkan UDTF polimorfik dengan argumen tabel. UDTF menerima tabel dan daftar nama kolom yang dipisahkan koma, lalu hanya memproyeksikan kolom tersebut dari input. Metode ini analyze memeriksa skema tabel input dan membangun skema output yang hanya berisi kolom yang diminta.

CREATE OR REPLACE FUNCTION project_columns(t TABLE, columns STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ProjectColumns'
AS $$
class ProjectColumns:
    @staticmethod
    def analyze(t, columns):
        from pyspark.sql.types import StructType
        from pyspark.sql.udtf import AnalyzeResult

        requested = [c.strip() for c in columns.value.split(",")]
        input_schema = t.dataType
        output_fields = []
        for field in input_schema.fields:
            if field.name in requested:
                output_fields.append(field)
        if not output_fields:
            raise ValueError(
                f"None of the requested columns {requested} "
                f"exist in the input table"
            )
        return AnalyzeResult(schema=StructType(output_fields))

    def eval(self, row, columns: str):
        requested = [c.strip() for c in columns.split(",")]
        yield tuple(row[col] for col in requested if col in row)
$$;

Gunakan fungsi untuk memilih kolom tertentu dari tabel:

SELECT * FROM project_columns(
  TABLE(SELECT * FROM samples.nyctaxi.trips LIMIT 5),
  'pickup_zip, dropoff_zip, fare_amount'
);

Keterbatasan

Batasan berikut berlaku untuk Unity Catalog Python UDTFs:

Langkah selanjutnya