Sdílet prostřednictvím


Skalární funkce definované uživatelem – Python

Tento článek obsahuje příklady uživatelem definované funkce Pythonu (UDF). Ukazuje, jak zaregistrovat a vyvolat UDF (uživatelsky definované funkce), a uvádí upozornění ohledně pořadí vyhodnocování dílčích výrazů ve Spark SQL.

V Databricks Runtime 14.0 a novějších můžete pomocí uživatelem definovaných tabulkových funkcí Pythonu (UDTFs) zaregistrovat funkce, které místo skalárních hodnot vracejí celé relace. Viz uživatelem definované funkce tabulek v Pythonu (UDTFs).

Poznámka:

Ve službě Databricks Runtime 12.2 LTS a starších verzích nejsou na výpočetních prostředcích katalogu Unity podporovány Python UDF ani Pandas UDF při použití standardního režimu přístupu. Uživatelem definované skalární funkce v Pythonu a uživatelem definované funkce v Pandas jsou podporovány v prostředí Databricks Runtime 13.3 LTS a vyšším ve všech režimech přístupu.

Ve službě Databricks Runtime 13.3 LTS a novějších můžete pomocí syntaxe SQL registrovat skalární Python UDFs (uživatelské funkce definované uživatelem) do katalogu Unity. Viz uživatelem definované funkce (UDF) v katalogu Unity.

Registrujte funkci jako UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Můžete volitelně nastavit návratový typ vaší uživatelsky definované funkce. Výchozí návratový typ je StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Zavolejte UDF v Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Použití UDF s DataFrames

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Alternativně můžete deklarovat stejnou UDF pomocí poznámkové syntaxe:

from pyspark.sql.functions import udf

@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Varianty s UDF

Typ PySpark pro variantu je VariantType a hodnoty jsou typu VariantVal. Informace o variantách najdete v tématu Dotazování na data variant.

from pyspark.sql.types import VariantType

# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
  return VariantVal.parseJson(jsonString)

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
|        {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
  return {"v": VariantVal.parseJson(jsonString)}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
|        {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
  return [VariantVal.parseJson(jsonString)]

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
|           [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
  return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
|         toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+

Kontrola pořadí vyhodnocování a ověřování nulových hodnot

Spark SQL (včetně SQL a rozhraní DATAFrame and Dataset API) nezaručuje pořadí vyhodnocení dílčích výrazů. Zejména vstupy operátoru nebo funkce nejsou nutně vyhodnoceny zleva doprava nebo v jiném pevném pořadí. Například logické výrazy AND a OR nemají sémantiku "krácení výpočtu" zleva doprava.

Proto je nebezpečné spoléhat se na vedlejší účinky nebo pořadí vyhodnocení logických výrazů, a také na pořadí klauzulí WHERE a HAVING, protože během optimalizace a plánování dotazů může dojít k jejich přeuspořádání. Konkrétně platí, že pokud UDF spoléhá na krátké spojení v SQL pro kontrolu hodnoty null, neexistuje žádná záruka, že se kontrola hodnoty null provede před vyvoláním UDF. Příklad:

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Tato WHERE klauzule nezaručuje, že se UDF (uživatelsky definovaná funkce) strlen bude volat poté, co budou vyfiltrovány hodnoty null.

Pokud chcete provést správnou kontrolu hodnoty null, doporučujeme provést některou z následujících akcí:

  • Zajistěte, aby funkce definovaná uživatelem byla schopná pracovat s hodnotami null, a proveďte kontrolu hodnot null uvnitř samotné funkce definované uživatelem.
  • Použijte výrazy IF nebo CASE WHEN k ověření hodnoty null a vyvolání funkce definované uživatelem v podmíněné větvi
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Přihlašovací údaje služby ve skalárních uživatelem definovaných uživatelem Pythonu

Skalární uživatelem definované uživatelem Pythonu můžou pomocí přihlašovacích údajů služby Katalogu Unity bezpečně přistupovat k externím cloudovým službám. To je užitečné pro integraci operací, jako je například cloudová tokenizace, šifrování nebo správa tajných kódů přímo do transformací dat.

Přihlašovací údaje služby pro skalární uživatelem definované uživatelem Pythonu se podporují jenom u SQL Warehouse a obecného výpočetního prostředí.

Poznámka:

Servisní přihlašovací údaje v uživatelsky definovaných skalárních funkcích Pythonu vyžadují Databricks Runtime 17.1 a vyšší.

Pokud chcete vytvořit přihlašovací údaje služby, přečtěte si téma Vytvoření přihlašovacích údajů služby.

Poznámka:

API specifické pro uživatelsky definované funkce pro přihlašovací údaje služby:
V UDF použijte databricks.service_credentials.getServiceCredentialsProvider() pro přístup k přihlašovacím údajům služby.

To se liší od dbutils.credentials.getServiceCredentialsProvider() funkce používané v poznámkových blocích, která není k dispozici v kontextech spouštění UDF.

Pokud chcete získat přístup k přihlašovacím údajům služby, použijte databricks.service_credentials.getServiceCredentialsProvider() nástroj v logice UDF k inicializaci cloudových sad SDK s příslušnými přihlašovacími údaji. Veškerý kód musí být zapouzdřen v těle uživatele.

@udf
def use_service_credential():
    from azure.mgmt.web import WebSiteManagementClient

    # Assuming there is a service credential named 'testcred' set up in Unity Catalog
    web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
    # Use web_client to perform operations

Oprávnění k přihlašovacím údajům služby

Tvůrce uživatelem definovaného uživatelem musí mít oprávnění ACCESS k přihlašovacím údajům služby Katalogu Unity.

Funkce definované uživatelem, které běží v oboru No-PE označované také jako vyhrazené clustery, vyžadují oprávnění MANAGE k přihlašovacím údajům služby.

Výchozí přihlašovací údaje

Při použití ve skalárních uživatelem definovaných uživatelem Pythonu databricks automaticky použije výchozí přihlašovací údaje služby z proměnné výpočetního prostředí. Toto chování umožňuje bezpečně odkazovat na externí služby bez explicitní správy aliasů přihlašovacích údajů v kódu UDF. Viz Zadání výchozích přihlašovacích údajů služby pro výpočetní prostředek

Výchozí podpora přihlašovacích údajů je dostupná jenom v clusterech standardního a vyhrazeného režimu přístupu. Není k dispozici ve službě DBSQL.

Chcete-li použít poskytovatele azure-identity, musíte nainstalovat balíček DefaultAzureCredential. Pokud chcete balíček nainstalovat, přečtěte si téma Knihovny Pythonu s oborem poznámkového bloku nebo knihovny s vymezeným výpočetním oborem.

@udf
def use_service_credential():
    from azure.identity import DefaultAzureCredential
    from azure.mgmt.web import WebSiteManagementClient

    # DefaultAzureCredential is automatically using the default service credential for the compute
    web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)

    # Use web_client to perform operations

Získejte kontext provádění úkolu

Použijte PySpark API TaskContext k získání informací o kontextu, jako je identita uživatele, značky clusteru, ID úlohy Spark a další. Viz Získání kontextu úkolu v UDF.

Omezení

Následující omezení platí pro PySpark UDFs:

  • Omezení přístupu k souborům: Ve službě Databricks Runtime 14.2 a nižších nemohou PySpark UDFs na sdílených clusterech získat přístup ke složkám Gitu, souborům pracovního prostoru ani ke svazkům katalogu Unity.

  • proměnné všesměrového vysílání: UDF PySpark v clusterech standardního režimu přístupu a bezserverové výpočetní prostředky nepodporují proměnné všesměrového vysílání.

  • Přihlašovací údaje služby: Přihlašovací údaje služby jsou k dispozici pouze v uživatelem definovaných uživatelem Pythonu v katalogu Batch Unity a uživatelem definovaných uživatelem jazyka Python. Nejsou podporované ve standardních uživatelem definovaných funkcích Pythonu v katalogu Unity.

  • Přihlašovací údaje služby: Přihlašovací údaje služby jsou k dispozici pouze v bezserverových výpočetních prostředcích, pokud používáte bezserverové prostředí verze 3 nebo vyšší. Viz verze bezserverového prostředí .

  • Omezení paměti na bezserverových platformách: Funkce definované uživatelem PySpark pro bezserverové platformy mají limit paměti 1 GB na PySpark UDF. Překročení tohoto limitu způsobí chybu typu UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
  • Limit paměti ve standardním režimu přístupu: Funkce definované uživatelem PySpark ve standardním režimu přístupu mají omezení paměti na základě dostupné paměti zvoleného typu instance. Překročení dostupné paměti způsobí chybu typu UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.