Vad är användardefinierade tabellfunktioner i Python?

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

Med en användardefinierad tabellfunktion (UDTF) kan du registrera funktioner som returnerar tabeller i stället för skalära värden. UDF:er fungerar på samma sätt som vanliga tabelluttryck (CTE) när de refereras i SQL-frågor. Du refererar till UDF:er i satsen i FROM en SQL-instruktion och du kan länka ytterligare Spark SQL-operatorer till resultatet.

UDF:er är registrerade på den lokala SparkSession och är isolerade på notebook- eller jobbnivå.

UDF:er stöds för beräkning som konfigurerats med tilldelade eller icke-isoleringsbaserade delade åtkomstlägen. Du kan inte använda UDF:er i läget för delad åtkomst.

Du kan inte registrera UDF:er som objekt i Unity Catalog, och UDF:er kan inte användas med SQL-lager.

Vad är den grundläggande syntaxen för en UDTF?

Apache Spark implementerar Python UDTFs som Python-klasser med en obligatorisk eval metod.

Du genererar resultat som rader med hjälp av yield.

För att Apache Spark ska kunna använda din klass som UDTF måste du importera funktionen PySpark udtf .

Databricks rekommenderar att du använder den här funktionen som dekoratör och alltid uttryckligen anger fältnamn och typer med hjälp av returnType alternativet .

I följande exempel skapas en enkel tabell från skalära indata med hjälp av en UDTF:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, x: int, y: int):
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Du kan använda Python-syntax *args och implementera logik för att hantera ett ospecificerat antal indatavärden. I följande exempel returneras samma resultat när du uttryckligen kontrollerar indatalängden och typerna för argumenten:

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Registrera en UDTF

Du kan registrera en UDTF till den aktuella SparkSession för användning i SQL-frågor med hjälp av följande syntax:

spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)

I följande exempel registreras en Python UDTF till SQL:

spark.udtf.register("simple_udtf", SimpleUDTF)

När du har registrerat dig kan du använda UDTF i SQL med antingen det %sql magiska kommandot eller spark.sql() funktionen, som i följande exempel:

%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")

Ger resultat

Python-UDF:er implementeras med yield för att returnera resultat. Resultatet returneras alltid som en tabell som innehåller 0 eller fler rader med det angivna schemat.

När skalära argument skickas körs logiken eval i metoden exakt en gång med uppsättningen skalära argument som skickas. För tabellargument eval körs metoden en gång för varje rad i indatatabellen.

Logik kan skrivas för att returnera 0, 1 eller många rader per indata.

Följande UDTF visar hur du returnerar 0 eller fler rader för varje indata genom att separera objekt från en kommaavgränsad lista i separata poster:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

Skicka ett tabellargument till en UDTF

Du kan använda SQL-nyckelordet TABLE() för att skicka ett tabellargument till en UDTF. Du kan använda ett tabellnamn eller en fråga, som i följande exempel:

TABLE(table_name);
TABLE(SELECT * FROM table_name);

Tabellargument bearbetas en rad i taget. Du kan använda standardkommentarer för PySpark-kolumnfält för att interagera med kolumner i varje rad. I följande exempel visas explicit import av PySpark-typen Row och sedan filtrering av den skickade tabellen i id fältet:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

Skicka skalära argument till en UDTF

Du kan skicka skalära argument till en UDTF med valfri kombination av följande värden:

  • Skalära konstanter
  • Skalärfunktioner
  • Fält i en relation

Om du vill skicka fält i en relation måste du registrera UDTF och använda SQL-nyckelordet LATERAL .

Kommentar

Du kan använda infogade tabellalias för att skilja kolumner åt.

I följande exempel visas hur du använder LATERAL för att skicka fält från en tabell till en UDTF:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

spark.udtf.register("itemize", Itemize)

spark.sql("""
    SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
    (2, 'spoons,'),
    (3, ''),
    (4, 'knives,cups') t(id, item_list),
    LATERAL itemize(id, item_list) b
""").show()

Ange standardvärden för UDF:er

Du kan också implementera en __init__ metod för att ange standardvärden för klassvariabler som du kan referera till i Python-logiken.

Metoden __init__ accepterar inga argument och har ingen åtkomst till variabler eller tillståndsinformation i SparkSession.

Använda Apache Arrow med UDF:er

Databricks rekommenderar att du använder Apache Arrow för UDF:er som tar emot en liten mängd data som indata men matar ut en stor tabell.

Du kan aktivera Pil genom att ange parametern useArrow när du deklarerar UDTF, som i följande exempel:

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1