Condividi tramite


Che cosa sono le funzioni di tabella definite dall'utente python?

Importante

Questa funzionalità è disponibile in anteprima pubblica.

Una funzione di tabella definita dall'utente (UDTF) consente di registrare funzioni che restituiscono tabelle anziché valori scalari. Le funzioni definite dall'utente funzionano in modo analogo alle espressioni di tabella comuni (CTE) quando si fa riferimento alle query SQL. Si fa riferimento alle funzioni definite dall'utente nella FROM clausola di un'istruzione SQL ed è possibile concatenare altri operatori SPARK SQL ai risultati.

Le funzioni definite dall'utente vengono registrate in SparkSession locale e sono isolate a livello di notebook o processo.

Le funzioni definite dall'utente sono supportate nel calcolo configurato con modalità di accesso condiviso assegnate o senza isolamento. Non è possibile usare funzioni definite dall'utente in modalità di accesso condiviso.

Non è possibile registrare funzioni definite dall'utente come oggetti nel catalogo unity e non è possibile usare funzioni definite dall'utente con sql warehouse.

Qual è la sintassi di base per un tipo definito dall'utente?

Apache Spark implementa le funzioni definite dall'utente Python come classi Python con un metodo obbligatorio eval .

I risultati vengono generati come righe usando yield.

Affinché Apache Spark usi la classe come tipo definito dall'utente, è necessario importare la funzione PySpark udtf .

Databricks consiglia di usare questa funzione come decorator e specificare sempre in modo esplicito nomi di campo e tipi usando l'opzione returnType .

L'esempio seguente crea una tabella semplice da input scalari usando un tipo definito dall'utente:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, x: int, y: int):
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

È possibile usare la sintassi Python *args e implementare la logica per gestire un numero non specificato di valori di input. Nell'esempio seguente viene restituito lo stesso risultato controllando in modo esplicito la lunghezza e i tipi di input per gli argomenti:

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Registrare un tipo definito dall'utente

È possibile registrare un tipo definito dall'utente nell'oggetto SparkSession corrente da usare nelle query SQL usando la sintassi seguente:

spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)

L'esempio seguente registra un tipo definito dall'utente python in SQL:

spark.udtf.register("simple_udtf", SimpleUDTF)

Dopo la registrazione, è possibile usare UDTF in SQL usando il comando o spark.sql() la %sql funzione magic, come negli esempi seguenti:

%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")

Risultati risultanti

Le funzioni definite dall'utente Python vengono implementate con yield per restituire i risultati. I risultati vengono sempre restituiti come tabella contenente 0 o più righe con lo schema specificato.

Quando si passano argomenti scalari, la eval logica nel metodo viene eseguita esattamente una volta con il set di argomenti scalari passati. Per gli argomenti di tabella, il eval metodo viene eseguito una volta per ogni riga nella tabella di input.

La logica può essere scritta per restituire 0, 1 o molte righe per input.

Il tipo definito dall'utente seguente illustra la restituzione di 0 o più righe per ogni input separando gli elementi da un elenco delimitato da virgole in voci separate:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

Passare un argomento di tabella a un tipo definito dall'utente

È possibile usare la parola chiave TABLE() SQL per passare un argomento di tabella a un tipo definito dall'utente. È possibile usare un nome di tabella o una query, come negli esempi seguenti:

TABLE(table_name);
TABLE(SELECT * FROM table_name);

Gli argomenti di tabella vengono elaborati una riga alla volta. È possibile usare annotazioni standard dei campi di colonna PySpark per interagire con le colonne in ogni riga. L'esempio seguente illustra in modo esplicito l'importazione del tipo PySpark Row e quindi il filtro della tabella passata nel id campo:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

Passare argomenti scalari a un tipo definito dall'utente

È possibile passare argomenti scalari a un tipo definito dall'utente usando qualsiasi combinazione dei valori seguenti:

  • Costanti scalari
  • Funzioni scalari
  • Campi in una relazione

Per passare campi in una relazione, è necessario registrare il tipo definito dall'utente e usare la parola chiave SQL LATERAL .

Nota

È possibile usare alias di tabella inline per evitare ambiguità tra le colonne.

L'esempio seguente illustra l'uso LATERAL di per passare campi da una tabella a un tipo definito dall'utente:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

spark.udtf.register("itemize", Itemize)

spark.sql("""
    SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
    (2, 'spoons,'),
    (3, ''),
    (4, 'knives,cups') t(id, item_list),
    LATERAL itemize(id, item_list) b
""").show()

Impostare i valori predefiniti per le funzioni definite dall'utente

Facoltativamente, è possibile implementare un __init__ metodo per impostare i valori predefiniti per le variabili di classe a cui è possibile fare riferimento nella logica Python.

Il __init__ metodo non accetta argomenti e non ha accesso a variabili o informazioni sullo stato in SparkSession.

Usare Apache Arrow con funzioni definite dall'utente

Databricks consiglia di usare Apache Arrow per le funzioni definite dall'utente che ricevono una piccola quantità di dati come input, ma generano una tabella di grandi dimensioni.

È possibile abilitare Arrow specificando il useArrow parametro quando si dichiara UDTF, come nell'esempio seguente:

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1