Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questo articolo contiene esempi di funzioni definite dall'utente (UDF) python. Illustra come registrare le UDF, come richiamare le UDF e fornisce avvertenze sull'ordine di valutazione delle sottoespressioni in Spark SQL.
In Databricks Runtime 14.0 e versioni successive è possibile usare funzioni di tabella definite dall'utente Python per registrare funzioni che restituiscono intere relazioni anziché valori scalari. Vedere Funzioni di tabella definite dall'utente python (UDF) .
Nota
In Databricks Runtime 12.2 LTS e versioni precedenti, le funzioni definite dall'utente Python e Pandas non sono supportate nelle risorse di calcolo di Unity Catalog che usano la modalità di accesso standard. Le funzioni utente scalari Python e le funzioni utente Pandas sono supportate in Databricks Runtime 13.3 LTS e versioni successive per tutte le modalità di accesso.
In Databricks Runtime 13.3 LTS e successive, è possibile registrare funzioni definite dall'utente Python scalari nel Catalogo Unity usando la sintassi SQL. Si veda Funzioni definite dall'utente (UDF) nel catalogo Unity.
Registrare una funzione come UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
È possibile facoltativamente impostare il tipo di ritorno della funzione definita dall'utente. Il tipo restituito predefinito è StringType.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Richiama l’UDF in Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Usare le UDF con i DataFrame
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
In alternativa, è possibile dichiarare la stessa UDF usando la sintassi di annotazione.
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Varianti con UDF
Il tipo PySpark per variant è VariantType e i valori sono di tipo VariantVal. Per informazioni sulle varianti, vedere Eseguire query dei dati delle varianti.
from pyspark.sql.types import VariantType
# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
return VariantVal.parseJson(jsonString)
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
| {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
return {"v": VariantVal.parseJson(jsonString)}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
| {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
return [VariantVal.parseJson(jsonString)]
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
| [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
| toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+
Ordine di valutazione e controllo del valore nullo
Spark SQL (incluso SQL e l'API DataFrame e Dataset) non garantisce l'ordine di valutazione delle sottoespressioni. In particolare, gli input di un operatore o di una funzione non vengono necessariamente valutati da sinistra a destra o in qualsiasi altro ordine fisso. Ad esempio, le espressioni logiche AND e OR non hanno la semantica di “cortocircuito” da sinistra a destra.
Pertanto, è pericoloso basarsi sugli effetti collaterali o sull'ordine di valutazione delle espressioni booleane e sull'ordine delle clausole WHERE e HAVING, poiché tali espressioni e clausole possono essere riordinate durante l'ottimizzazione e la pianificazione delle query. In particolare, se una UDF si basa sulla semantica di corto circuito in SQL per il controllo dei valori null, non c'è garanzia che il controllo venga eseguito prima di richiamare la UDF. Ad esempio,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Questa clausola WHERE non garantisce che la UDF strlen venga invocata dopo il filtraggio dei valori nulli.
Per eseguire un controllo null appropriato, è consigliabile eseguire una delle operazioni seguenti:
- Rendere la funzione definita dall'utente null-aware ed eseguire il controllo dei valori null all'interno di questa stessa funzione.
- Utilizzare le espressioni
IFoCASE WHENper effettuare il controllo su valori nulli e invocare una funzione definita dall'utente all'interno di un ramo condizionale.
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Credenziali del servizio nelle funzioni definite dall'utente di Python scalari
Le UDF scalari Python possono usare le credenziali del servizio Unity Catalog per accedere in modo sicuro ai servizi cloud esterni. Ciò è utile per l'integrazione di operazioni quali la tokenizzazione basata sul cloud, la crittografia o la gestione dei segreti direttamente nelle trasformazioni dei dati.
Le credenziali del servizio per le funzioni definite dall'utente Python scalari sono supportate solo in SQL Warehouse e nel calcolo generale.
Nota
Le credenziali di servizio nelle UDF Python scalari richiedono Databricks Runtime 17.1 o superiore.
Per creare credenziali del servizio, vedere Creare credenziali del servizio.
Nota
API specifica della funzione definita dall'utente per le credenziali del servizio:
Nelle funzioni definite dall'utente, usare databricks.service_credentials.getServiceCredentialsProvider() per accedere alle credenziali del servizio.
Questo comportamento è diverso dalla funzione dbutils.credentials.getServiceCredentialsProvider() usata nei notebook, che non è disponibile nei contesti di esecuzione UDF.
Per accedere alle credenziali del servizio, utilizzare l'utilità nella logica UDF databricks.service_credentials.getServiceCredentialsProvider() per inizializzare gli SDK cloud con le credenziali appropriate. Tutto il codice deve essere incapsulato nel corpo della funzione definita dall'utente.
@udf
def use_service_credential():
from azure.mgmt.web import WebSiteManagementClient
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
# Use web_client to perform operations
Autorizzazioni per le credenziali del servizio
L'autore della funzione definita dall'utente deve disporre del permesso ACCESS alla credenziale del servizio del Catalogo Unity.
Le UDFs eseguite nell'ambito No-PE, conosciute anche come cluster dedicati, richiedono permessi MANAGE sulle credenziali del servizio.
Credenziali predefinite
Quando utilizzato nelle funzioni definite dall'utente di Python scalari (Scalar Python UDFs), Databricks utilizza automaticamente le credenziali del servizio predefinite dalla variabile di ambiente del servizio di calcolo. Questo comportamento consente di fare riferimento in modo sicuro ai servizi esterni senza gestire in modo esplicito gli alias delle credenziali nel codice UDF. Vedere Specificare una credenziale del servizio predefinita per una risorsa di calcolo
Il supporto predefinito delle credenziali è disponibile solo nei cluster in modalità di accesso standard e dedicato. Non è disponibile in DBSQL.
È necessario installare il azure-identity pacchetto per usare il DefaultAzureCredential provider. Per installare il pacchetto, vedere Librerie Python con ambito notebook o librerie con ambito calcolo.
@udf
def use_service_credential():
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
# DefaultAzureCredential is automatically using the default service credential for the compute
web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
# Use web_client to perform operations
Ottenere il contesto di esecuzione delle attività
Usare l'API PySpark TaskContext per ottenere informazioni di contesto, ad esempio l'identità dell'utente, i tag del cluster, l'ID processo Spark e altro ancora. Vedi Ottieni il contesto del compito in una UDF.
Limiti
Le seguenti limitazioni si applicano alle UDF di PySpark.
Restrizioni di accesso ai file: In Databricks Runtime 14.2 e versioni precedenti le funzioni definite dall'utente PySpark nei cluster condivisi non possono accedere a cartelle Git, file dell'area di lavoro o volumi del catalogo Unity.
Variabili di trasmissione: le UDF PySpark nei cluster in modalità di accesso standard e nel calcolo serverless non supportano le variabili di trasmissione.
Credenziali del servizio: Le credenziali del servizio sono disponibili solo nelle UDF Python del Catalogo Unity di Batch e nelle UDF Python scalari. Non sono supportate nelle funzioni definite dall'utente Python del Unity Catalog standard.
Credenziali del servizio: le credenziali del servizio sono disponibili solo nel calcolo serverless quando si usa l'ambiente serverless versione 3 o successiva. Consultare le versioni dell'ambiente serverless .
- Limite di memoria per serverless: le PySpark UDFs nel calcolo serverless hanno un limite di memoria di 1 GB per ciascuna PySpark UDF. Il superamento di questo limite genera un errore di tipo UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
- Limite di memoria in modalità di accesso standard: le UDF di PySpark in modalità di accesso standard hanno un limite di memoria basato sulla memoria disponibile del tipo di istanza scelto. Il superamento della memoria disponibile genera un errore di tipo UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.