Share via


Was sind benutzerdefinierte Python-Tabellenfunktionen?

Wichtig

Dieses Feature befindet sich in der Public Preview.

Mit einer benutzerdefinierten Tabellenfunktion (UDTF) können Sie Funktionen registrieren, die Tabellen anstelle von Skalarwerten zurückgeben. UDTFs funktionieren ähnlich wie allgemeine Tabellenausdrücke (CTEs), wenn in SQL-Abfragen darauf verwiesen wird. Sie verweisen in der FROM-Klausel einer SQL-Anweisung auf UDTFs, und Sie können zusätzliche Spark SQL-Operatoren mit den Ergebnissen verketten.

UDTFs werden in der lokalen SparkSession registriert und sind auf Notizbuch- oder Auftragsebene isoliert.

UDTFs werden bei der Berechnung unterstützt, die mit zugewiesenen oder nicht isolierten gemeinsam genutzten Zugriffsmodi konfiguriert ist. Sie können UDTFs nicht im Modus für den freigegebenen Zugriff verwenden.

Sie können UDTFs nicht als Objekte im Unity-Katalog registrieren, und UDTFs können nicht mit SQL-Warehouses verwendet werden.

Was ist der grundlegende Syntax für eine UDTF?

Apache Spark implementiert Python UDTFs als Python-Klassen mit einer obligatorischen eval-Methode.

Sie geben Ergebnisse als Zeilen mit yield aus.

Damit Apache Spark Ihre Klasse als UDTF verwenden kann, müssen Sie die PySpark-Funktion udtf importieren.

Databricks empfiehlt die Verwendung dieser Funktion als Dekorateur und immer die explizite Angabe von Feldnamen und Typen mithilfe der returnType-Option.

Im folgenden Beispiel wird eine einfache Tabelle aus skalaren Eingaben mithilfe einer UDTF erstellt:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, x: int, y: int):
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Sie können Python-*args-Syntax verwenden und Logik implementieren, um eine nicht angegebene Anzahl von Eingabewerten zu verarbeiten. Im folgenden Beispiel wird dasselbe Ergebnis zurückgegeben, während die Eingabelänge und die Eingabetypen für die Argumente explizit überprüft werden:

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Registrieren einer UDTF

Sie können eine UDTF für die aktuelle SparkSession für die Verwendung in SQL-Abfragen mithilfe des folgenden Syntax registrieren:

spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)

Im folgenden Beispiel wird eine Python UDTF in SQL registriert:

spark.udtf.register("simple_udtf", SimpleUDTF)

Nach der Registrierung können Sie die UDTF in SQL entweder mit dem magischen Befehl %sql oder der Funktion spark.sql() verwenden, wie in den folgenden Beispielen:

%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")

Ergebniserreichung

Python UDTFs werden mit yield implementiert, um Ergebnisse zurückzugeben. Ergebnisse werden immer als Tabelle zurückgegeben, die 0 oder mehr Zeilen nach dem angegebenen Schema enthält.

Beim Übergeben von skalaren Argumenten wird die Logik in der eval-Methode genau einmal mit der übergebenen Gruppe von Skalarargumenten ausgeführt. Bei Tabellenargumenten wird die eval-Methode einmal für jede Zeile in der Eingabetabelle ausgeführt.

Die Logik kanngeschrieben werden, um 0, 1 oder viele Zeilen pro Eingabe zurückzugeben.

Die folgende UDTF veranschaulicht die Rückgabe von 0 oder mehr Zeilen für jede Eingabe, indem Elemente aus einer durch Kommas getrennten Liste in separate Einträge getrennt werden:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

Übergeben eines Tabellenarguments an eine UDTF

Sie können das SQL-Schlüsselwort TABLE() verwenden, um ein Tabellenargument an eine UDTF zu übergeben. Sie können einen Tabellennamen oder eine Abfrage wie in den folgenden Beispielen verwenden:

TABLE(table_name);
TABLE(SELECT * FROM table_name);

Tabellenargumente werden je Zeile verarbeitet. Sie können standardmäßige PySpark-Spaltenfeldanmerkungen verwenden, um mit Spalten in jeder Zeile zu interagieren. Das folgende Beispiel veranschaulicht den expliziten Import des PySpark-Typs Row und anschließendes Filtern der übergebenen Tabelle auf das Feld id:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

Übergeben von skalaren Argumenten an eine UDTF

Sie können skalare Argumente mithilfe einer beliebigen Kombination der folgenden Werte an eine UDTF übergeben:

  • Skalare Konstanten
  • Skalarfunktionen
  • Felder in einer Beziehung

Zum Übergeben von Feldern in einer Beziehung müssen Sie die UDTF registrieren und das SQL-Schlüsselwort LATERAL verwenden.

Hinweis

Sie können ein Inlinetabellenalias verwenden, um mehrdeutige Spalten zu disambiguieren.

Im folgenden Beispiel wird die Verwendung von LATERAL zum Übergeben von Feldern aus einer Tabelle an eine UDTF veranschaulicht:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

spark.udtf.register("itemize", Itemize)

spark.sql("""
    SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
    (2, 'spoons,'),
    (3, ''),
    (4, 'knives,cups') t(id, item_list),
    LATERAL itemize(id, item_list) b
""").show()

Festlegen von Standardwerten für UDTFs

Sie können optional eine __init__-Methode implementieren, um Standardwerte für Klassenvariablen festzulegen, auf die Sie in Ihrer Python-Logik verweisen können.

Die __init__-Methode akzeptiert keine Argumente und hat keinen Zugriff auf Variablen oder Statusinformationen in der SparkSession.

Verwenden von Apache Arrow mit UDTFs

Databricks empfiehlt die Verwendung von Apache Arrow für UDTFs, die eine kleine Datenmenge als Eingabe empfangen, aber eine große Tabelle ausgeben.

Sie können Apache Arrow aktivieren, indem Sie den useArrow-Parameter beim Deklarieren der UDTF angeben, wie im folgenden Beispiel gezeigt:

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1