Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Importante
Questa funzionalità è in Anteprima Pubblica.
Le funzioni definite dall'utente Python (UDF) del Batch Unity Catalog estendono le funzionalità delle UDF del Unity Catalog consentendo di scrivere codice Python per operare su insiemi di dati, migliorando notevolmente l'efficienza riducendo il sovraccarico associato alle UDF riga per riga. Queste ottimizzazioni rendono le UDF Python in batch di Unity Catalog ideali per l'elaborazione dei dati su larga scala.
Requisiti
Le funzioni definite dall'utente Python del Catalogo Unity richiedono il Databricks Runtime delle versioni 16.3 e successive.
Creare una funzione UDF Python per il catalogo Batch Unity
La creazione di una funzione UDF Python del catalogo Batch Unity è simile alla creazione di una normale funzione UDF del catalogo Unity, con le aggiunte seguenti:
-
PARAMETER STYLE PANDAS
: Specifica che la UDF (funzione definita dall'utente) elabora i dati in batch usando iteratori pandas. -
HANDLER 'handler_function'
: specifica la funzione del gestore chiamata per elaborare i batch.
L'esempio seguente illustra come creare una funzione Python definita dall'utente per Unity Catalog Batch.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
Dopo aver registrato l'UDF (funzione definita dall'utente), è possibile chiamarla usando i linguaggi SQL o Python.
SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);
Funzione del gestore UDF batch
Le funzioni definite dall'utente Python del catalogo Unity Batch richiedono una funzione gestore che elabora i batch e produce i risultati. È necessario specificare il nome della funzione del gestore quando si crea la funzione definita dall'utente usando la chiave HANDLER
.
La funzione del gestore esegue le operazioni seguenti:
- Accetta un argomento iteratore che esegue l'iterazione su uno o più
pandas.Series
. Ognunopandas.Series
contiene i parametri di input della UDF (funzione definita dall'utente). - Scorre il generatore ed elabora i dati.
- Restituisce un iteratore generatore.
Le UDF Python del Batch Unity Catalog devono restituire lo stesso numero di righe dell'input. La funzione del gestore garantisce questo risultato producendo un oggetto pandas.Series
con la stessa lunghezza della serie di input per ogni batch.
Installare dipendenze personalizzate
È possibile estendere le funzionalità delle funzioni Python definite dall'utente del catalogo Batch Unity oltre l'ambiente di Runtime di Databricks definendo dipendenze personalizzate per le librerie esterne.
Consultare Come estendere le funzioni definite dall'utente utilizzando dipendenze personalizzate.
Le UDF batch possono accettare parametri singoli o multipli
Singolo parametro: Quando la funzione del gestore usa un singolo parametro di input, riceve un iteratore che esegue l'iterazione su un pandas.Series
per ogni batch.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for value_batch in batch_iter:
d = {"min": value_batch.min(), "max": value_batch.max()}
yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;
Più parametri: Per più parametri di input, la funzione di gestione riceve un iteratore che itera su più pandas.Series
. I valori della serie sono nello stesso ordine dei parametri di input.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for p1, p2 in batch_iter: # same order as arguments above
yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);
Ottimizzare le prestazioni separando le operazioni costose
È possibile ottimizzare le operazioni di calcolo costose separando queste operazioni dalla funzione del gestore. In questo modo si garantisce che vengano eseguiti una sola volta anziché durante ogni iterazione su batch di dati.
L'esempio seguente illustra come garantire che un calcolo costoso venga eseguito una sola volta:
%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
# expensive computation...
return 1
expensive_value = compute_value()
def handler_func(batch_iter):
for batch in batch_iter:
yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL
Credenziali del servizio nelle funzioni definite dall'utente Python del Catalogo Unity di Batch
Le UDF Python di Batch Unity Catalog possono usare le Credenziali del Servizio Catalogo Unity per accedere ai servizi cloud esterni. Ciò è particolarmente utile per l'integrazione di funzioni cloud come tokenizzatori di sicurezza nei flussi di lavoro di elaborazione dati.
Per creare credenziali del servizio, vedere Creare credenziali del servizio.
Specificare le credenziali del servizio da utilizzare nella clausola CREDENTIALS
nella definizione della funzione UDF definita dall'utente.
CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
`credential-name` DEFAULT,
`complicated-credential-name` AS short_name,
`simple-cred`,
cred_no_quotes
)
AS $$
# Python code here
$$;
Autorizzazioni per le credenziali del servizio
Il creatore della UDF deve disporre ACCESS
dell'autorizzazione sul credito di servizio del Catalogo Unity. Tuttavia, per i chiamanti UDF è sufficiente concedere loro EXECUTE
l'autorizzazione sulla UDF. In particolare, i chiamanti delle funzioni definite dall'utente (UDF) non devono accedere alle credenziali del servizio sottostanti, perché la UDF viene eseguita usando le autorizzazioni dell'autore della funzione.
Per le funzioni temporanee, l'autore è sempre l'invocatore. Le funzioni definite dall'utente (UDF) che operano nell'ambito No-PE, noto anche come cluster dedicati, utilizzano invece le autorizzazioni del chiamante.
Credenziali e alias predefiniti
È possibile includere più credenziali nella CREDENTIALS
clausola , ma solo una può essere contrassegnata come DEFAULT
. È possibile eseguire l'alias delle credenziali non predefinite usando la AS
parola chiave . Ogni credenziale deve avere un alias univoco.
Gli SDK cloud con patch prelevano automaticamente le credenziali predefinite. La credenziale predefinita ha la precedenza su qualsiasi impostazione predefinita specificata nella configurazione Spark di calcolo e viene preservata nella definizione della funzione definita dall'utente del catalogo Unity.
È necessario installare il azure-identity
pacchetto per usare il DefaultAzureCredential
provider. Usare la ENVIRONMENT
clausola per installare librerie esterne. Per ulteriori informazioni sull'installazione di librerie esterne, vedere Estendere i UDF usando dipendenze personalizzate.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION call_lambda_func(data STRING) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
dependencies = '["azure-identity", "azure-mgmt-web"]', environment_version = 'None'
)
AS $$
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
def batchhandler(it):
# DefaultAzureCredential is automatically using 'batch-udf-service-creds-example-cred'
web_client = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
for vals in data:
yield vals
$$
Esempio di credenziali del servizio - Funzione lambda AWS
L'esempio seguente usa una credenziale del servizio per chiamare una funzione AWS Lambda da una funzione UDF Python del catalogo Batch Unity.
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
Chiamare la UDF dopo la registrazione:
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
Ottenere il contesto di esecuzione delle attività
Usare l'API PySpark TaskContext per ottenere informazioni di contesto, ad esempio l'identità dell'utente, i tag del cluster, l'ID processo Spark e altro ancora. Vedi Ottieni il contesto del compito in una UDF.
Limitazioni
- Le funzioni Python devono gestire
NULL
i valori in modo indipendente e tutti i mapping dei tipi devono seguire i mapping del linguaggio SQL di Azure Databricks. - Le funzioni UDF di Python del Batch Unity Catalog vengono eseguite in un ambiente sicuro e isolato e non hanno accesso a un file system condiviso o a servizi interni.
- Più invocazioni di UDF all'interno di una fase vengono serializzate e i risultati intermedi vengono materializzati e possono essere scritti su disco.
- Le credenziali del servizio sono disponibili solo nelle funzioni definite dall'utente Python nel Batch Unity Catalog. Non sono supportati nelle funzioni definite dall'utente Python del catalogo Unity standard né nelle funzioni UDF PySpark.
- Nei cluster dedicati e per le funzioni temporanee, il chiamante di funzione deve disporre di
ACCESS
permessi per le credenziali del servizio. Vedere Concedere le autorizzazioni per usare le credenziali di un servizio per accedere a un servizio cloud esterno. - Abilitare la funzionalità di anteprima pubblica Abilitare la rete per le UDF nei SQL Warehouse senza server per effettuare chiamate UDF Python del Unity Catalog Batch a servizi esterni nel calcolo su SQL Warehouse senza server.
- Per effettuare chiamate UDF Python del Batch Unity Catalog su un notebook o in un ambiente di calcolo serverless, è necessario configurare il controllo sull'uscita serverless.