Sdílet prostřednictvím


Uživatelem definované funkce tabulek v Pythonu (UDTFs)

Důležité

Tato funkce je ve verzi Public Preview v Databricks Runtime 14.3 LTS a vyšší.

Uživatelem definovaná funkce tabulky (UDTF) umožňuje registrovat funkce, které vracejí tabulky místo skalárních hodnot. Na rozdíl od skalárních funkcí, které vrací jednu výslednou hodnotu z každého volání, je každý UDTF vyvolán v klauzuli příkazu FROM SQL a vrací celou tabulku jako výstup.

Každé volání UDTF může přijímat nula nebo více argumentů. Tyto argumenty mohou být skalární výrazy nebo argumenty tabulky představující celé vstupní tabulky.

Základní syntaxe UDTF

Apache Spark implementuje definované uživatelem Pythonu jako třídy Pythonu s povinnou eval metodou, která používá yield k generování výstupních řádků.

Pokud chcete třídu použít jako UDTF, musíte importovat funkci PySpark udtf . Databricks doporučuje použít tuto funkci jako dekorátor a explicitně zadat názvy a typy polí pomocí returnType této možnosti (pokud třída nedefinuje metodu popsanou analyze v další části).

Následující funkce UDTF vytvoří tabulku s pevným seznamem dvou celých argumentů:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Registrace UDTF

Funkce definované uživatelem jsou zaregistrované v místním SparkSession prostředí a jsou izolované na úrovni poznámkového bloku nebo úlohy.

UDTFs nelze zaregistrovat jako objekty v katalogu Unity a UDTFs nelze použít se sklady SQL.

UDTF můžete zaregistrovat do aktuálního SparkSession stavu pro použití v dotazech SQL s funkcí spark.udtf.register(). Zadejte název funkce SQL a třídy UDTF Pythonu.

spark.udtf.register("get_sum_diff", GetSumDiff)

Volání registrovaného UDTF

Po registraci můžete použít UDTF v SQL pomocí %sql příkazu magic nebo spark.sql() funkce:

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Použití Apache Arrow

Pokud váš UDTF přijímá jako vstup malé množství dat, ale vypíše velkou tabulku, databricks doporučuje použít Apache Arrow. Můžete ho povolit zadáním parametru useArrow při deklarování UDTF:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Seznamy argumentů proměnných – *args a **kwargs

K zpracování nezadaného počtu vstupních hodnot můžete použít Python *args nebo **kwargs syntaxi a implementovat logiku.

Následující příklad vrátí stejný výsledek při explicitní kontrole vstupní délky a typů argumentů:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Tady je stejný příklad, ale použití argumentů klíčových slov:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Definování statického schématu v době registrace

UDTF vrací řádky s výstupním schématem, které tvoří seřazenou sekvenci názvů a typů sloupců. Pokud by schéma UDTF vždy mělo zůstat stejné pro všechny dotazy, můžete za dekorátorem zadat statické a pevné schéma @udtf . Musí to být StructTypebuď:

StructType().add("c1", StringType())

Nebo řetězec DDL představující typ struktury:

c1: string

Výpočet dynamického schématu v době volání funkce

Funkce definované uživatelem můžou také programově vypočítat výstupní schéma pro každé volání v závislosti na hodnotách vstupních argumentů. Chcete-li to provést, definujte statickou metodu, analyze která přijímá nula nebo více parametrů, které odpovídají argumentům zadaným pro konkrétní volání UDTF.

Každý argument analyze metody je instance AnalyzeArgument třídy, která obsahuje následující pole:

AnalyzeArgument Pole třídy Popis
dataType Typ vstupního argumentu jako DataType. U argumentů vstupní tabulky se jedná o StructType reprezentaci sloupců tabulky.
value Hodnota vstupního argumentu jako Optional[Any]. Toto je None pro argumenty tabulky nebo skalární argumenty literálu, které nejsou konstantní.
isTable Zda vstupní argument je tabulka jako BooleanType.
isConstantExpression Zda vstupní argument je konstantní skládací výraz jako BooleanType.

Metoda analyze vrátí instanci AnalyzeResult třídy, která zahrnuje schéma výsledné tabulky jako StructType plus některá volitelná pole. Pokud UDTF přijímá argument vstupní tabulky, AnalyzeResult může také zahrnovat požadovaný způsob rozdělení a pořadí řádků vstupní tabulky napříč několika voláními UDTF, jak je popsáno později.

AnalyzeResult Pole třídy Popis
schema Schéma výsledné tabulky jako StructType.
withSinglePartition Zda odeslat všechny vstupní řádky do stejné instance třídy UDTF jako BooleanType.
partitionBy Pokud je nastavena na neprázdné, všechny řádky s každou jedinečnou kombinací hodnot výrazů dělení jsou spotřebovány samostatnou instancí UDTF třídy.
orderBy Pokud je nastavená hodnota neprázdná, určuje pořadí řádků v rámci každého oddílu.
select Pokud je nastavená na neprázdnou, jedná se o sekvenci výrazů, které UDTF určuje, aby nástroj Catalyst vyhodnocoval sloupce ve vstupním argumentu TABLE. UDTF obdrží jeden vstupní atribut pro každý název v seznamu v pořadí, v jakém jsou uvedeny.

Tento analyze příklad vrátí jeden výstupní sloupec pro každé slovo ve vstupním řetězcovém argumentu.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

Přesměrování stavu na budoucí eval volání

Tato analyze metoda může sloužit jako vhodné místo k provedení inicializace a následné předání výsledků do budoucích eval volání metody pro stejné volání UDTF.

Uděláte to tak, že vytvoříte podtřídu AnalyzeResult a vrátíte instanci podtřídy analyze z metody. Pak do metody přidejte další argument __init__ , který tuto instanci přijme.

Tento analyze příklad vrátí konstantní výstupní schéma, ale přidá vlastní informace do metadat výsledku, která se budou využívat budoucí __init__ volání metody:

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Výnos výstupních řádků

Metoda eval se spustí jednou pro každý řádek argumentu vstupní tabulky (nebo jen jednou, pokud není zadán žádný argument tabulky), za ním následuje jedno vyvolání terminate metody na konci. Metoda buď vypíše nula nebo více řádků odpovídajících schématu výsledku tím, že získá řazené kolekce členů, seznamy nebo pyspark.sql.Row objekty.

Tento příklad vrátí řádek zadáním řazené kolekce členů tří prvků:

def eval(self, x, y, z):
  yield (x, y, z)

Můžete také vynechat závorky:

def eval(self, x, y, z):
  yield x, y, z

Přidejte koncovou čárku, která vrátí řádek pouze s jedním sloupcem:

def eval(self, x, y, z):
  yield x,

Můžete také získat pyspark.sql.Row objekt.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

Tento příklad poskytuje výstupní řádky z terminate metody pomocí seznamu Pythonu. Stav můžete uložit uvnitř třídy z předchozích kroků v vyhodnocení UDTF pro tento účel.

def terminate(self):
  yield [self.x, self.y, self.z]

Předání skalárních argumentů do UDTF

Skalární argumenty můžete předat do UDTF jako konstantní výrazy, které obsahují hodnoty literálů nebo funkce založené na nich. Příklad:

SELECT * FROM udtf(42, group => upper("finance_department"));

Předání argumentů tabulky do UDTF

Uživatelem definované funkce Pythonu můžou kromě skalárních vstupních argumentů přijmout vstupní tabulku jako argumenty. Jeden UDTF může také přijmout argument tabulky a více skalárních argumentů.

Jakýkoli dotaz SQL pak může poskytnout vstupní tabulku pomocí klíčového TABLE slova následovaného závorky kolem příslušného identifikátoru tabulky, například TABLE(t). Alternativně můžete předat poddotaz tabulky, například TABLE(SELECT a, b, c FROM t) nebo TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

Argument vstupní tabulky je pak reprezentován jako pyspark.sql.Row argument metody eval s jedním voláním eval metody pro každý řádek ve vstupní tabulce. K interakci se sloupci v jednotlivých řádcích můžete použít standardní poznámky polí sloupců PySpark. Následující příklad ukazuje explicitní import typu PySpark Row a následné filtrování předané tabulky v id poli:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

K dotazování funkce použijte TABLE klíčové slovo SQL:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Určení dělení vstupních řádků z volání funkce

Při volání UDTF s argumentem tabulky může každý dotaz SQL rozdělit vstupní tabulku na několik volání UDTF na základě hodnot jednoho nebo více vstupních sloupců tabulky.

Pokud chcete zadat oddíl, použijte PARTITION BY klauzuli ve volání funkce za argumentem TABLE . To zaručuje, že všechny vstupní řádky s každou jedinečnou kombinací hodnot sloupců dělení budou spotřebovány přesně jednou instancí UDTF třídy.

Všimněte si, že kromě jednoduchých odkazů na PARTITION BY sloupce klauzule také přijímá libovolné výrazy založené na sloupcích vstupní tabulky. Můžete například zadat LENGTH řetězec, extrahovat měsíc z data nebo zřetězení dvou hodnot.

Je také možné zadat WITH SINGLE PARTITION místo PARTITION BY vyžádání pouze jednoho oddílu, kde všechny vstupní řádky musí být spotřebovány přesně jednou instancí UDTF třídy.

V rámci každého oddílu můžete volitelně zadat požadované pořadí vstupních řádků, protože je metoda UDTF eval využívá. Uděláte to tak, že za klauzulí nebo WITH SINGLE PARTITION klauzulí popsanou ORDER BYPARTITION BY výše zadáte klauzuli.

Představte si například následující UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Možnosti dělení můžete zadat při volání UDTF přes vstupní tabulku několika způsoby:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Určení dělení vstupních řádků z analyze metody

Všimněte si, že pro každý z výše uvedených způsobů dělení vstupní tabulky při volání UDF v dotazech SQL existuje odpovídající způsob, jak metoda UDTF analyze určit stejnou metodu dělení automaticky.

  • Místo volání UDTF jako SELECT * FROM udtf(TABLE(t) PARTITION BY a), můžete aktualizovat metodu analyze nastavit pole partitionBy=[PartitioningColumn("a")] a jednoduše volat funkci pomocí SELECT * FROM udtf(TABLE(t)).
  • Pomocí stejného tokenu můžete místo zadání TABLE(t) WITH SINGLE PARTITION ORDER BY b v dotazu SQL nastavit analyze pole withSinglePartition=true a orderBy=[OrderingColumn("b")] pak jednoduše předat TABLE(t).
  • Místo předání TABLE(SELECT a FROM t) dotazu SQL můžete nastavit analyzeselect=[SelectedColumn("a")] a pak jednoduše předat TABLE(t).

V následujícím příkladu analyze vrátí konstantní výstupní schéma, vybere podmnožinu sloupců ze vstupní tabulky a určí, že vstupní tabulka je rozdělena na několik volání UDTF na základě hodnot date sloupce:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])