Bagikan melalui


Fungsi yang Ditentukan Pengguna (UDF) Batch Python di Unity Catalog

Penting

Fitur ini ada di Pratinjau Publik.

UDF Python pada Batch Unity Catalog memperluas kemampuan UDF Unity Catalog dengan memungkinkan Anda menulis kode Python untuk memproses batch data, secara signifikan meningkatkan efisiensi dengan mengurangi overhead yang terkait dengan UDF yang memproses data baris per baris. Pengoptimalan ini membuat UDF Python batch Unity Catalog ideal untuk pemrosesan data skala besar.

Persyaratan

Batch Unity Catalog Python UDFs memerlukan Databricks Runtime versi 16.3 ke atas.

Buat Katalog Unity Batch Python UDF

Membuat Batch Unity Catalog Python UDF mirip dengan membuat Unity Catalog UDF biasa, dengan tambahan berikut:

  • PARAMETER STYLE PANDAS: Ini menentukan bahwa UDF memproses data dalam batch menggunakan iterator Pandas.
  • HANDLER 'handler_function': Ini menentukan fungsi handler yang dipanggil untuk memproses batch.

Contoh berikut menunjukkan kepada Anda cara membuat Katalog Unity Batch Python UDF:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

Setelah mendaftarkan UDF, Anda dapat memanggilnya menggunakan SQL atau Python:

SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
  SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
  SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);

Fungsi penangan kumpulan UDF

Batch Unity Catalog Python UDF memerlukan fungsi handler yang memproses batch dan menghasilkan hasil. Anda harus menentukan nama fungsi handler saat membuat UDF dengan menggunakan HANDLER kunci .

Fungsi handler melakukan hal berikut:

  1. Menerima argumen iterator yang berulang di atas satu atau beberapa pandas.Series. Masing-masing pandas.Series berisi parameter input untuk fungsi UDF.
  2. Melakukan iterasi pada generator dan memproses data.
  3. Mengembalikan iterasi generator.

Python UDF Katalog Unity Batch harus mengembalikan jumlah baris yang sama seperti input. Fungsi handler memastikan ini dengan menghasilkan pandas.Series dengan panjang yang sama dengan seri input untuk setiap batch.

Menginstal dependensi kustom

Anda dapat memperluas fungsionalitas Batch Unity Catalog Python UDF di luar lingkungan Databricks Runtime dengan menentukan dependensi kustom untuk pustaka eksternal.

Lihat Memperluas UDF menggunakan dependensi kustom.

Batch UDF dapat menerima parameter tunggal atau beberapa

Parameter tunggal: Ketika fungsi handler menggunakan parameter input tunggal, fungsi ini menerima iterator yang mengiterasi terhadap pandas.Series untuk setiap batch.

%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  for value_batch in batch_iter:
    d = {"min": value_batch.min(), "max": value_batch.max()}
    yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;

Beberapa parameter: Untuk beberapa parameter input, fungsi handler menerima iterator yang melakukan iterasi melalui beberapa pandas.Series. Nilai dalam seri berada dalam urutan yang sama dengan parameter input.

%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for p1, p2 in batch_iter: # same order as arguments above
    yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);

Optimalkan performa dengan memisahkan operasi yang mahal

Anda dapat mengoptimalkan operasi yang mahal secara komputasi dengan memisahkan operasi ini dari fungsi handler. Ini memastikan bahwa mereka dijalankan hanya sekali daripada selama setiap iterasi atas batch data.

Contoh berikut menunjukkan cara memastikan bahwa komputasi mahal dilakukan hanya sekali:

%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
  # expensive computation...
  return 1

expensive_value = compute_value()
def handler_func(batch_iter):
  for batch in batch_iter:
    yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL

Isolasi lingkungan

Nota

Lingkungan isolasi bersama memerlukan Databricks Runtime 17.1 ke atas. Dalam versi sebelumnya, semua Batch Unity Catalog Python UDF berjalan dalam mode isolasi yang ketat.

Batch Unity Catalog Python UDFs dengan pemilik dan sesi yang sama dapat berbagi lingkungan isolasi secara default. Ini dapat meningkatkan performa dan mengurangi penggunaan memori dengan mengurangi jumlah lingkungan terpisah yang perlu diluncurkan.

Isolasi ketat

Untuk memastikan UDF selalu berjalan di lingkungannya sendiri yang sepenuhnya terisolasi, tambahkan STRICT ISOLATION klausa karakteristik.

Sebagian besar UDF tidak memerlukan isolasi yang ketat. UDF pemrosesan data standar mendapat manfaat dari lingkungan isolasi bersama default dan berjalan lebih cepat dengan konsumsi memori yang lebih rendah.

Tambahkan klausa karakteristik STRICT ISOLATION ke UDF yang:

  • Jalankan input sebagai kode menggunakan eval(), exec(), atau fungsi serupa
  • Menulis file ke sistem file lokal
  • Mengubah variabel global atau status sistem
  • Mengubah variabel lingkungan

Contoh berikut menunjukkan UDF yang menjalankan input sebagai kode dan memerlukan isolasi ketat:

CREATE OR REPLACE TEMPORARY FUNCTION eval_string(input STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
STRICT ISOLATION
AS $$
import pandas as pd
from typing import Iterator

def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  for code_series in batch_iter:
    def eval_func(code):
      try:
        return str(eval(code))
      except Exception as e:
        return f"Error: {e}"
    yield code_series.apply(eval_func)
$$;

Kredensial layanan di Batch Unity Catalog Python UDFs

Fungsi UDF Python pada Batch Unity Catalog dapat menggunakan Kredensial Layanan Unity Catalog untuk mengakses layanan cloud eksternal. Ini sangat berguna untuk mengintegrasikan fungsi cloud seperti tokenizer keamanan ke dalam alur kerja pemrosesan data.

Nota

API khusus UDF untuk kredensial layanan:
Di UDF, gunakan databricks.service_credentials.getServiceCredentialsProvider() untuk mengakses kredensial layanan.

Ini berbeda dari fungsi dbutils.credentials.getServiceCredentialsProvider() yang digunakan dalam notebook, yang tidak tersedia dalam konteks eksekusi UDF.

Untuk membuat kredensial layanan, lihat Membuat kredensial layanan.

Tentukan kredensial layanan yang ingin Anda gunakan dalam CREDENTIALS klausa dalam definisi UDF:

CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
  `credential-name` DEFAULT,
  `complicated-credential-name` AS short_name,
  `simple-cred`,
  cred_no_quotes
)
AS $$
# Python code here
$$;

Izin akses layanan

Pembuat UDF harus memiliki ACCESS izin pada identitas layanan Katalog Unity. Namun, untuk penelepon UDF, cukup memberikan mereka EXECUTE izin pada UDF. Secara khusus, penelepon UDF tidak memerlukan akses ke kredensial layanan yang mendasar, karena UDF dijalankan menggunakan izin kredensial pembuat UDF.

Untuk fungsi sementara, pembuat selalu menjadi pemanggil. UDF yang berjalan dalam cakupan No-PE, juga dikenal sebagai kluster khusus, menggunakan izin pemanggil sebagai gantinya.

Kredensial dan alias default

Anda dapat menyertakan beberapa kredensial dalam CREDENTIALS klausa, tetapi hanya satu yang dapat ditandai sebagai DEFAULT. Anda dapat memberikan alias untuk kredensial non-default dengan menggunakan kata kunci AS. Setiap kredensial harus memiliki alias yang unik.

SDK cloud yang di-patch secara otomatis mengambil kredensial default. Kredensial default didahulukan daripada default yang ditentukan dalam konfigurasi Spark komputasi, dan disimpan dalam definisi UDF Unity Catalog.

Anda harus menginstal paket azure-identity untuk menggunakan penyedia DefaultAzureCredential. Gunakan klausa ENVIRONMENT untuk menginstal pustaka eksternal. Untuk mempelajari selengkapnya tentang menginstal pustaka eksternal, lihat Memperluas UDF menggunakan dependensi kustom.

Contoh kredensial layanan - Azure Blob Storage

Contoh berikut menggunakan kredensial layanan untuk mengakses Azure Blob Storage dari Batch Unity Catalog Python UDF:

%sql
CREATE OR REPLACE FUNCTION main.test.read_azure_blob(blob_name STRING) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
  dependencies = '["azure-identity", "azure-storage-blob"]', environment_version = 'None'
)
AS $$
import pandas as pd
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient


def batchhandler(it):
  # DefaultAzureCredential automatically uses the DEFAULT service credential
  credential = DefaultAzureCredential()
  blob_service = BlobServiceClient(
    account_url="https://your-storage-account.blob.core.windows.net",
    credential=credential
  )
  container = blob_service.get_container_client("your-container")

  for blob_names in it:
    results = []
    for name in blob_names:
      blob_client = container.get_blob_client(name)
      try:
        content = blob_client.download_blob().readall().decode("utf-8")
        results.append(content)
      except Exception as e:
        results.append(f"Error: {e}")
    yield pd.Series(results)
$$;

Hubungi UDF setelah terdaftar:

SELECT main.test.read_azure_blob(blob_name)
FROM VALUES
('config/settings.json'),
('data/input.txt')
AS t(blob_name)

Dapatkan konteks eksekusi tugas

Gunakan API TaskContext PySpark untuk mendapatkan informasi konteks seperti identitas pengguna, tag klaster, ID pekerjaan spark, dan lainnya. Lihat Dapatkan konteks tugas di UDF (Fungsi yang Didefinisikan Pengguna).

Atur DETERMINISTIC jika fungsi Anda menghasilkan hasil yang konsisten

Tambahkan DETERMINISTIC ke definisi fungsi Anda jika menghasilkan output yang sama untuk input yang sama. Ini memungkinkan pengoptimalan kueri untuk meningkatkan performa.

Secara default, Batch Unity Catalog Python UDTF diasumsikan sebagai non-deterministioc kecuali dinyatakan secara eksplisit. Contoh fungsi non-deterministik meliputi: menghasilkan nilai acak, mengakses waktu atau tanggal saat ini, atau melakukan panggilan API eksternal.

Lihat CREATE FUNCTION (SQL dan Python)

Keterbatasan

  • Fungsi Python harus menangani NULL nilai secara independen, dan semua pemetaan jenis harus mengikuti pemetaan bahasa Azure Databricks SQL.
  • Batch Unity Catalog Python UDF berjalan di lingkungan yang aman dan terisolasi dan tidak memiliki akses ke sistem file bersama atau layanan internal.
  • Beberapa pemanggilan UDF dalam tahap dijalankan secara berurutan dan hasil sementara dimaterialisasi dan dapat dituliskan ke disk.
  • Kredensial layanan hanya tersedia di Batch Unity Catalog Python UDFs dan Scalar Python UDFs. Mereka tidak didukung dalam Unity Catalog Python UDF standar.
  • Pada kluster khusus dan untuk fungsi sementara, pemanggil fungsi harus memiliki ACCESS izin pada kredensial layanan. Lihat Memberikan izin untuk menggunakan kredensial layanan untuk mengakses layanan cloud eksternal.
  • Aktifkan fitur Pratinjau Umum Aktifkan jaringan untuk UDF di Gudang SQL Tanpa Server di halaman Pratinjau ruang kerja Anda untuk melakukan panggilan UDF Python Katalog Unity Batch ke layanan eksternal pada komputasi gudang SQL tanpa server.
  • Untuk melakukan panggilan UDF Python Katalog Unity Batch pada notebook tanpa server atau komputasi tugas, Anda harus mengonfigurasi kontrol keluar tanpa server.