Udostępnij przez


Funkcje skalarne zdefiniowane przez użytkownika — Python

Ten artykuł zawiera przykłady funkcji zdefiniowanej przez użytkownika języka Python (UDF). Przedstawia on sposób rejestrowania funkcji zdefiniowanych przez użytkownika, wywoływania funkcji zdefiniowanych przez użytkownika i zapewnia zastrzeżenia dotyczące kolejności oceny podexpressionów w usłudze Spark SQL.

Requirements

  • W środowisku Databricks Runtime 12.2 LTS i starszym funkcje zdefiniowane przez użytkownika języka Python i funkcje zdefiniowane przez użytkownika biblioteki Pandas nie są obsługiwane w obliczeniach na Unity Catalog, które korzystają z trybu standardowego dostępu.

  • Skalarne funkcje zdefiniowane przez użytkownika w Pythonie (UDF) i funkcje zdefiniowane przez użytkownika w Pandas (UDF) są obsługiwane w Databricks Runtime 13.3 LTS i nowszych wersjach dla wszystkich trybów dostępu.

  • Obsługa instancji ARM dla funkcji UDF języka Python w klastrach z włączonym Unity Catalog wymaga środowiska Databricks Runtime 15.2 lub nowszego.

W środowisku Databricks Runtime 14.0 i wcześniejszym funkcje zdefiniowane przez użytkownika w języku Python i funkcje zdefiniowane przez użytkownika w bibliotece Pandas nie są obsługiwane w klastrach Unity Catalog korzystających ze standardowego trybu dostępu. Skalarne funkcje zdefiniowane przez użytkownika w Pythonie i funkcje Pandas zdefiniowane przez użytkownika są obsługiwane dla wszystkich trybów dostępu w systemie Databricks Runtime 14.1 lub nowszym.

W Databricks Runtime 14.1 lub nowszym można zarejestrować skalarne funkcje UDF Pythona w Unity Catalog przy użyciu składni SQL. Zobacz funkcje definiowane przez użytkownika (UDFs) w katalogu Unity.

Rejestracja funkcji jako UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Opcjonalnie można ustawić typ zwracany funkcji zdefiniowanej przez użytkownika. Domyślnym typem zwracania jest StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Wywołaj UDF w Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Używanie funkcji UDF z ramkami danych

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Alternatywnie można zadeklarować tę samą funkcję zdefiniowaną przez użytkownika przy użyciu składni adnotacji:

from pyspark.sql.functions import udf

@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Warianty z funkcją UDF

Typ PySpark dla wariantu to VariantType , a wartości są typu VariantVal. Aby uzyskać informacje o wariantach, zobacz Zapytanie dotyczące danych wariantów.

from pyspark.sql.types import VariantType

# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
  return VariantVal.parseJson(jsonString)

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
|        {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
  return {"v": VariantVal.parseJson(jsonString)}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
|        {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
  return [VariantVal.parseJson(jsonString)]

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
|           [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
  return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
|         toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+

Kolejność oceny i sprawdzanie wartości null

Spark SQL (w tym SQL, API DataFrame oraz Dataset) nie gwarantuje kolejności oceny podwyrażeń. W szczególności dane wejściowe operatora lub funkcji nie muszą być oceniane od lewej do prawej ani w innej stałej kolejności. Na przykład wyrażenia logiczne AND i OR nie mają semantyki od lewej do prawej „skrótowego przerywania”.

W związku z tym niebezpieczne jest poleganie na skutkach ubocznych lub kolejności obliczania wyrażeń logicznych oraz kolejności klauzul WHERE i HAVING, ponieważ takie wyrażenia i klauzule mogą zostać zmienione podczas optymalizacji zapytań i planowania. W szczególności, jeśli funkcja UDF wykorzystuje semantykę zwarciową w języku SQL w celu sprawdzania wartości null, nie ma gwarancji, że sprawdzanie wartości null nastąpi przed wywołaniem funkcji UDF. Na przykład:

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Ta WHERE klauzula nie gwarantuje, że funkcja zdefiniowana przez użytkownika strlen zostanie wywołana po usunięciu wartości null.

Aby wykonać odpowiednie sprawdzanie wartości null, zalecamy wykonanie jednej z następujących czynności:

  • Upewnij się, że funkcja UDF obsługuje wartości null i sprawdza je wewnątrz siebie samej
  • Użyj wyrażeń IF lub CASE WHEN do sprawdzenia braku wartości i wywołania funkcji zdefiniowanej przez użytkownika w gałęzi warunkowej
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Poświadczenia usługi w scalarnych funkcjach UDF zdefiniowanych przez użytkownika w Pythonie

Skalarne UDF-y Pythona mogą wykorzystywać poświadczenia usługi Unity Catalog, aby bezpiecznie uzyskiwać dostęp do zewnętrznych usług w chmurze. Jest to przydatne w przypadku integrowania operacji, takich jak tokenizacja w chmurze, szyfrowanie lub zarządzanie tajemnicami bezpośrednio w przekształceniach danych.

Poświadczenia usługi dla skalarnych funkcji zdefiniowanych przez użytkownika języka Python są obsługiwane tylko w usłudze SQL Warehouse i ogólnych obliczeniach.

Uwaga

Poświadczenia usługi w skalarnych funkcjach zdefiniowanych przez użytkownika (UDF) w języku Python wymagają środowiska Databricks Runtime w wersji 17.1 lub nowszej.

Aby utworzyć poświadczenia usługi, zobacz Tworzenie poświadczeń usługi.

Uwaga

API specyficzne dla UDF dotyczące poświadczeń usługi:
W funkcjach zdefiniowanych przez użytkownika użyj databricks.service_credentials.getServiceCredentialsProvider() do uzyskania dostępu do danych uwierzytelniających usługę.

Różni się to od funkcji dbutils.credentials.getServiceCredentialsProvider() używanej w notesach, która nie jest dostępna w kontekstach wykonywania UDF (funkcji zdefiniowanej przez użytkownika).

Aby uzyskać dostęp do poświadczeń usługi, użyj narzędzia databricks.service_credentials.getServiceCredentialsProvider() w logice UDF, aby zainicjować SDK w chmurze odpowiednimi poświadczeniami. Cały kod musi być enkapsulowany w treści funkcji UDF.

@udf
def use_service_credential():
    from azure.mgmt.web import WebSiteManagementClient

    # Assuming there is a service credential named 'testcred' set up in Unity Catalog
    web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
    # Use web_client to perform operations

Uprawnienia poświadczeń usługi

Twórca funkcji UDF musi mieć uprawnienie ACCESS w poświadczeniu usługi Unity Catalog.

Funkcje zdefiniowane przez użytkownika, które działają w zakresie No-PE, zwanych również dedykowanymi klastrami, wymagają uprawnień do zarządzania poświadczeniami usługi.

Poświadczenia domyślne

W przypadku użycia w skalarnych funkcjach zdefiniowanych przez użytkownika w Pythonie, Databricks automatycznie używa domyślnego poświadczenia serwisu ze zmiennej środowiskowej środowiska obliczeniowego. To zachowanie pozwala bezpiecznie odwoływać się do usług zewnętrznych bez jawnego zarządzania aliasami poświadczeń w kodzie UDF. Zobacz Określanie domyślnego poświadczenia usługi dla zasobu obliczeniowego

Domyślna obsługa poświadczeń jest dostępna tylko w klastrach w trybie dostępu standardowym i dedykowanym. Nie jest ona dostępna w bazie danych DBSQL.

Musisz zainstalować pakiet azure-identity, aby używać dostawcy DefaultAzureCredential. Aby zainstalować pakiet, zobacz Biblioteki Pythona o zakresie notatnika lub Biblioteki o zakresie obliczeniowym.

@udf
def use_service_credential():
    from azure.identity import DefaultAzureCredential
    from azure.mgmt.web import WebSiteManagementClient

    # DefaultAzureCredential is automatically using the default service credential for the compute
    web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)

    # Use web_client to perform operations

Pobierz kontekst wykonania zadania

Użyj API PySpark TaskContext, aby uzyskać informacje kontekstowe, takie jak tożsamość użytkownika, tagi klastra, identyfikator zadania Spark i więcej. Zobacz Uzyskiwanie kontekstu zadania w UDF.

Ograniczenia

Następujące ograniczenia dotyczą UDF-ów PySpark:

  • Ograniczenia dostępu do plików: W środowisku Databricks Runtime 14.2 i niżej funkcje zdefiniowane przez użytkownika PySpark w udostępnionych klastrach nie mogą uzyskiwać dostępu do folderów Git, plików obszarów roboczych ani woluminów Unity Catalog.

  • Zmienne rozgłaszane: Funkcje zdefiniowane przez użytkownika PySpark w klastrach w standardowym trybie dostępu i w obliczeniach bezserwerowych nie obsługują zmiennych rozgłaszanych.

  • Poświadczenia usługi: Poświadczenia usługi są dostępne tylko w funkcjach Batch Unity Catalog Python UDF i Scalar Python UDF. Nie są obsługiwane w standardowych Pythonowych UDF w Unity Catalog.

  • Poświadczenia usługi: poświadczenia usługi są dostępne tylko w obliczeniach bezserwerowych w przypadku korzystania ze środowiska bezserwerowego w wersji 3 lub nowszej. Zobacz wersje środowiska bezserwerowego.

  • Limit pamięci bezserwerowej: funkcje zdefiniowane przez użytkownika PySpark w obliczeniach bezserwerowych mają limit pamięci 1 GB na funkcję zdefiniowanej przez użytkownika PySpark. Przekroczenie tego limitu powoduje błąd typu UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
  • Limit pamięci w standardowym trybie dostępu: UDF (funkcje zdefiniowane przez użytkownika) PySpark w standardowym trybie dostępu mają limit pamięci w zależności od dostępnej pamięci dla wybranego typu wystąpienia. Przekroczenie dostępnej pamięci powoduje błąd typu UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.