Udostępnij za pośrednictwem


Funkcje skalarne zdefiniowane przez użytkownika — Python

Ten artykuł zawiera przykłady funkcji zdefiniowanej przez użytkownika języka Python (UDF). Przedstawia on sposób rejestrowania funkcji zdefiniowanych przez użytkownika, wywoływania funkcji zdefiniowanych przez użytkownika i zapewnia zastrzeżenia dotyczące kolejności oceny podexpressionów w usłudze Spark SQL.

W środowisku Databricks Runtime 14.0 lub nowszym można użyć funkcji tabeli zdefiniowanej przez użytkownika języka Python (UDTFs), aby zarejestrować funkcje zwracające całe relacje zamiast wartości skalarnych. Zobacz funkcje tabeli zdefiniowane przez użytkownika w języku Python (UDTFs).

Uwaga

W środowisku Databricks Runtime 12.2 LTS i starszym funkcje zdefiniowane przez użytkownika języka Python i funkcje zdefiniowane przez użytkownika biblioteki Pandas nie są obsługiwane w obliczeniach na Unity Catalog, które korzystają z trybu standardowego dostępu. Skalarne funkcje zdefiniowane przez użytkownika w Pythonie (UDF) i funkcje zdefiniowane przez użytkownika w Pandas (UDF) są obsługiwane w Databricks Runtime 13.3 LTS i nowszych wersjach dla wszystkich trybów dostępu.

W środowisku Databricks Runtime 13.3 LTS i nowszym można zarejestrować skalarne funkcje Pythona zdefiniowane przez użytkownika (UDF) w Unity Catalog przy użyciu składni SQL. Zobacz funkcje definiowane przez użytkownika (UDFs) w katalogu Unity.

Rejestracja funkcji jako UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Opcjonalnie można ustawić typ zwracany funkcji zdefiniowanej przez użytkownika. Domyślnym typem zwracania jest StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Wywołaj UDF w Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Używanie funkcji UDF z ramkami danych

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Alternatywnie można zadeklarować tę samą funkcję zdefiniowaną przez użytkownika przy użyciu składni adnotacji:

from pyspark.sql.functions import udf

@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Kolejność oceny i sprawdzanie wartości null

Spark SQL (w tym SQL, API DataFrame oraz Dataset) nie gwarantuje kolejności oceny podwyrażeń. W szczególności dane wejściowe operatora lub funkcji nie muszą być oceniane od lewej do prawej ani w innej stałej kolejności. Na przykład wyrażenia logiczne AND i OR nie mają semantyki od lewej do prawej „skrótowego przerywania”.

W związku z tym niebezpieczne jest poleganie na skutkach ubocznych lub kolejności obliczania wyrażeń logicznych oraz kolejności klauzul WHERE i HAVING, ponieważ takie wyrażenia i klauzule mogą zostać zmienione podczas optymalizacji zapytań i planowania. W szczególności, jeśli funkcja UDF wykorzystuje semantykę zwarciową w języku SQL w celu sprawdzania wartości null, nie ma gwarancji, że sprawdzanie wartości null nastąpi przed wywołaniem funkcji UDF. Na przykład:

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Ta WHERE klauzula nie gwarantuje, że funkcja zdefiniowana przez użytkownika strlen zostanie wywołana po usunięciu wartości null.

Aby wykonać odpowiednie sprawdzanie wartości null, zalecamy wykonanie jednej z następujących czynności:

  • Upewnij się, że funkcja UDF obsługuje wartości null i sprawdza je wewnątrz siebie samej
  • Użyj wyrażeń IF lub CASE WHEN do sprawdzenia braku wartości i wywołania funkcji zdefiniowanej przez użytkownika w gałęzi warunkowej
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Pobierz kontekst wykonania zadania

Użyj API PySpark TaskContext, aby uzyskać informacje kontekstowe, takie jak tożsamość użytkownika, tagi klastra, identyfikator zadania Spark i więcej. Zobacz Uzyskiwanie kontekstu zadania w UDF.

Ograniczenia

Następujące ograniczenia dotyczą UDF-ów PySpark:

  • Ograniczenia dostępu do plików: W środowisku Databricks Runtime 14.2 i niżej funkcje zdefiniowane przez użytkownika PySpark w udostępnionych klastrach nie mogą uzyskiwać dostępu do folderów Git, plików obszarów roboczych ani woluminów Unity Catalog.

  • Zmienne rozgłaszane: Funkcje zdefiniowane przez użytkownika PySpark w klastrach w standardowym trybie dostępu i w obliczeniach bezserwerowych nie obsługują zmiennych rozgłaszanych.

  • Limit pamięci bezserwerowej: funkcje zdefiniowane przez użytkownika PySpark w obliczeniach bezserwerowych mają limit pamięci 1 GB na funkcję zdefiniowanej przez użytkownika PySpark. Przekroczenie tego limitu powoduje błąd typu UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
  • Limit pamięci w trybie dostępu standardowego: UDF PySpark w standardowym trybie dostępu są ograniczone przez dostępną pamięć wybranego typu instancji. Przekroczenie dostępnej pamięci powoduje błąd typu UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.