Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Ez a cikk a Python felhasználó által definiált függvényeinek (UDF) példáit tartalmazza. Bemutatja, hogyan regisztrálhatja az UDF-eket, hogyan hívhatja meg az UDF-eket, és bemutatja a Spark SQL-ben az alkifejezések kiértékelési sorrendjét.
A Databricks Runtime 14.0-s és újabb verzióiban a Python felhasználó által definiált táblafüggvényeivel (UDTF-ekkel) olyan függvényeket regisztrálhat, amelyek skaláris értékek helyett teljes kapcsolatokat ad vissza. Lásd: Python felhasználó által definiált táblafüggvények (UDTF-ek).
Feljegyzés
A Databricks Runtime 12.2 LTS és újabb verziókban a Python UDF-ek és a Pandas UDF-ek nem támogatottak a Standard hozzáférési módot használó Unity Catalog-számításban. A Skaláris Python UDF-ek és Pandas UDF-ek a Databricks Runtime 13.3 LTS-ben és annál is támogatottak az összes hozzáférési mód esetében.
A Databricks Runtime 13.3 LTS-ben és újabb verziókban skaláris Python UDF-eket regisztrálhat a Unity Catalogban SQL-szintaxissal. Lásd a felhasználó által definiált függvényeket (UDF-eket) a Unity Catalogban.
Függvény regisztrálása felhasználó által definiált függvényként (UDF)
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Az UDF visszatérési típusát igény szerint beállíthatja. Az alapértelmezett visszatérési típus a következő StringType.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Az UDF meghívása a Spark SQL-ben
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
UDF használata DataFrame-ekkel
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Másik lehetőségként deklarálhatja ugyanezt az UDF-et széljegyzetszintaxissal:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Változatok UDF-fel
A variáns PySpark típusa VariantType, és az értékek típusa VariantVal. A változatokról további információt a Lekérdezésvariáns adatok című témakörben talál.
from pyspark.sql.types import VariantType
# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
return VariantVal.parseJson(jsonString)
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
| {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
return {"v": VariantVal.parseJson(jsonString)}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
| {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
return [VariantVal.parseJson(jsonString)]
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
| [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
| toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+
Kiértékelési sorrend és nullérték-ellenőrzés
A Spark SQL (beleértve az SQL-t és a DataFrame- és Dataset API-t) nem garantálja az alexpressziók kiértékelésének sorrendjét. Az operátorok vagy függvények bemeneteit nem feltétlenül értékelik ki balról jobbra vagy más rögzített sorrendben. A logikai AND és OR kifejezési kifejezések például nem rendelkeznek balról jobbra "rövidzárolás" szemantikával.
Ezért veszélyes a logikai kifejezések mellékhatásaira vagy kiértékelési sorrendjére, valamint a záradékok sorrendjére WHEREHAVING támaszkodni, mivel az ilyen kifejezések és záradékok átrendezhetők a lekérdezésoptimalizálás és -tervezés során. Pontosabban, ha egy UDF az SQL rövidzárlatú szemantikára támaszkodik a nullellenőrzéshez, nincs garancia arra, hogy a null ellenőrzés az UDF meghívása előtt fog történni. Például,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Ez a WHERE záradék nem garantálja az UDF meghívását a strlen null értékek szűrése után.
A megfelelő nullellenőrzés végrehajtásához az alábbiak valamelyikét javasoljuk:
- Állítsa az UDF-et nullérzékenysé, és végezze el a null ellenőrzést magában az UDF-ben
- Használja a
IFvagy aCASE WHENkifejezéseket a null-ellenőrzéshez és az UDF feltételes ágban való meghívásához.
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Szolgáltatás hitelesítő adatai a Scalar Python UDF-jeiben
A skaláris Python UDF-ek a Unity Catalog szolgáltatás hitelesítő adataival biztonságosan hozzáférhetnek a külső felhőszolgáltatásokhoz. Ez olyan műveletek integrálásához hasznos, mint a felhőalapú tokenizálás, titkosítás vagy titkos kulcskezelés közvetlenül az adatátalakításokba.
A skaláris Python UDF-k szolgáltatás hitelesítő adatai csak az SQL Warehouse-on és az általános számításon támogatottak.
Feljegyzés
A Skaláris Python UDF-ekben a szolgáltatás hitelesítő adataihoz a Databricks Runtime 17.1 vagy újabb verziója szükséges.
Szolgáltatás hitelesítő adatainak létrehozásáról a Szolgáltatás hitelesítő adatainak létrehozása című témakörben olvashat.
Feljegyzés
A szolgáltatás hitelesítő adataihoz tartozó UDF-specifikus API:
Az UDF-ekben használja databricks.service_credentials.getServiceCredentialsProvider() a szolgáltatás hitelesítő adatainak eléréséhez.
Ez eltér a dbutils.credentials.getServiceCredentialsProvider() jegyzetfüzetekben használt függvénytől, amely nem érhető el UDF-végrehajtási környezetekben.
A szolgáltatás hitelesítő adatainak eléréséhez használja az databricks.service_credentials.getServiceCredentialsProvider() UDF-logika segédprogramját a felhőbeli SDK-k inicializálásához a megfelelő hitelesítő adatokkal. Az összes kódot bele kell ágyazni az UDF törzsébe.
@udf
def use_service_credential():
from azure.mgmt.web import WebSiteManagementClient
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
# Use web_client to perform operations
Szolgáltatási hitelesítő adatok jogosultságai
A UDF létrehozójának HOZZÁFÉRÉS-engedéllyel kell rendelkeznie a Unity Catalog szolgáltatás hitelesítő adataihoz.
A No-PE hatókörben futó UDF-ek, más néven dedikált fürtök, a szolgáltatás hitelesítő adataira vonatkozó MANAGE engedélyeket igényelnek.
Alapértelmezett hitelesítő adatok
Ha skaláris Python UDF-ekben használják, a Databricks automatikusan a számítási környezeti változó alapértelmezett szolgáltatás-hitelesítő adatait használja. Ez a viselkedés lehetővé teszi, hogy biztonságosan hivatkozzon külső szolgáltatásokra anélkül, hogy explicit módon kezelne hitelesítő aliasokat az UDF-kódban. Lásd: Alapértelmezett szolgáltatás hitelesítő adatainak megadása számítási erőforráshoz
Az alapértelmezett hitelesítő adatok támogatása csak standard és dedikált hozzáférési módú fürtökben érhető el. Ez nem érhető el a DBSQL-ben.
Ahhoz, hogy használni tudd a azure-identity szolgáltatót, telepítened kell a DefaultAzureCredential csomagot. A csomag telepítéséhez tekintse meg a jegyzetfüzet-hatókörű Python-kódtárakat vagy a számítási hatókörű kódtárakat.
@udf
def use_service_credential():
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
# DefaultAzureCredential is automatically using the default service credential for the compute
web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
# Use web_client to perform operations
Szerezze meg a feladat végrehajtási kontextust
Használja a TaskContext PySpark API-t a kontextus információk lekéréséhez, például a felhasználóazonosság, a klasztercímkék, a Spark feladat azonosító és további adatok megszerzéséhez. Lásd Feladat kontextusának megszerzése UDF-ben.
Korlátozások
A PySpark UDF-ekre a következő korlátozások vonatkoznak:
Fájlhozzáférés korlátozásai: A Databricks Runtime 14.2-ben és az alatta lévő PySpark UDF-ek megosztott fürtökön nem férnek hozzá a Git-mappákhoz, munkaterületfájlokhoz vagy Unity-katalóguskötetekhez.
Szórási változók: A PySpark UDF-jei a standard hozzáférési módú fürtökön és a kiszolgáló nélküli számításban nem támogatják a szórási változókat.
Szolgáltatás hitelesítő adatai: A szolgáltatás hitelesítő adatai csak a Batch Unity Catalog Python UDF-jeiben és a skaláris Python UDF-ekben érhetők el. A Standard Unity Catalog Python UDF-jei nem támogatják őket.
Szolgáltatás hitelesítő adatai: A szolgáltatás hitelesítő adatai csak kiszolgáló nélküli számításban érhetők el a kiszolgáló nélküli környezet 3. vagy újabb verziójának használatakor. Lásd kiszolgáló nélküli környezetekverzióit.
- Memóriakorlát kiszolgáló nélküli rendszeren: A kiszolgáló nélküli számítási pyspark-UDF-k memóriakorlátja PySpark UDF-enként 1 GB. A korlát túllépése UDF_PYSPARK_USER_CODE_ERROR típusú hibát eredményez . MEMORY_LIMIT_SERVERLESS.
- Memóriakorlát standard hozzáférési mód esetén: A PySpark UDF-jei normál hozzáférési módban a kiválasztott példánytípus rendelkezésre álló memóriája alapján memóriakorlátot biztosítanak. A rendelkezésre álló memória túllépése UDF_PYSPARK_USER_CODE_ERROR típusú hibát eredményez . MEMORY_LIMIT.