Поделиться через


Определяемые пользователем функции таблиц Python (определяемые пользователем функции)

Внимание

Эта функция доступна в общедоступной предварительной версии в Databricks Runtime 14.3 LTS и выше.

Определяемая пользователем функция таблицы (UDTF) позволяет регистрировать функции, возвращающие таблицы вместо скалярных значений. В отличие от скалярных функций, возвращающих одно результирующие значения из каждого вызова, каждый UDTF вызывается в предложении инструкции FROM SQL и возвращает всю таблицу в виде выходных данных.

Каждый вызов UDTF может принимать ноль или больше аргументов. Эти аргументы могут быть скалярными выражениями или аргументами таблицы, представляющими всю входную таблицу.

Базовый синтаксис UDTF

Apache Spark реализует UDTF Python в качестве классов Python с обязательным eval методом, который используется yield для выдачи выходных строк.

Чтобы использовать класс в качестве UDTF, необходимо импортировать функцию PySpark udtf . Databricks рекомендует использовать эту функцию в качестве декоратора и явно указывать имена полей и типы с помощью returnType параметра (если класс не определяет analyze метод, как описано в следующем разделе).

Следующий UDTF создает таблицу с помощью фиксированного списка двух целых аргументов:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Регистрация UDTF

Определяемые пользователем функции регистрируются локально SparkSession и изолированы на уровне записной книжки или задания.

Не удается зарегистрировать определяемые пользователем функции в качестве объектов в каталоге Unity, а определяемые пользователем объекты нельзя использовать с хранилищами SQL.

Вы можете зарегистрировать UDTF в текущий SparkSession для использования в sql-запросах с функцией spark.udtf.register(). Укажите имя функции SQL и класса UDTF Python.

spark.udtf.register("get_sum_diff", GetSumDiff)

Вызов зарегистрированного UDTF

После регистрации вы можете использовать UDTF в SQL с помощью волшебной %sql команды или spark.sql() функции:

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Использование Apache Arrow

Если UDTF получает небольшой объем данных в качестве входных данных, но выводит большую таблицу, Databricks рекомендует использовать Apache Arrow. Его можно включить, указав useArrow параметр при объявлении UDTF:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Списки аргументов переменной — *args и **kwargs

Вы можете использовать Python *args или **kwargs синтаксис и реализовать логику для обработки неопределенного количества входных значений.

В следующем примере возвращается тот же результат при явной проверке длины входных данных и типов аргументов:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Ниже приведен тот же пример, но использование аргументов ключевых слов:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Определение статической схемы во время регистрации

UDTF возвращает строки с выходной схемой, содержащей упорядоченную последовательность имен столбцов и типов. Если схема UDTF всегда должна оставаться одинаковой для всех запросов, можно указать статическую фиксированную схему после @udtf декоратора. Оно должно быть следующим:StructType

StructType().add("c1", StringType())

Или строка DDL, представляющая тип структуры:

c1: string

Вычисление динамической схемы во время вызова функции

Определяемые пользователем функции также могут программно вычислять схему вывода для каждого вызова в зависимости от значений входных аргументов. Для этого определите статический метод, который analyze принимает ноль или больше параметров, соответствующих аргументам, предоставленным конкретному вызову UDTF.

Каждый аргумент analyze метода является экземпляром AnalyzeArgument класса, содержащего следующие поля:

AnalyzeArgument поле класса Description
dataType Тип входного аргумента в виде DataType. Для аргументов входной таблицы это StructType столбцы таблицы.
value Значение входного аргумента в качестве Optional[Any]значения. Это касается None аргументов таблицы или скалярных аргументов, которые не являются константами.
isTable Указывает, является ли входной аргумент таблицей BooleanType.
isConstantExpression Указывает, является ли входной аргумент константным свертываемым выражением BooleanType.

Метод analyze возвращает экземпляр AnalyzeResult класса, который включает схему таблицы результатов в виде StructType дополнительных полей. Если UDTF принимает аргумент входной таблицы, AnalyzeResult можно также включить запрошенный способ секционирования и упорядочивания строк входной таблицы в нескольких вызовах UDTF, как описано ниже.

AnalyzeResult поле класса Description
schema Схема результирующих таблиц в виде StructType.
withSinglePartition Следует ли отправлять все входные строки в один и тот же экземпляр класса UDTF в качестве экземпляра BooleanType.
partitionBy Если задано значение непустое, все строки с каждым уникальным сочетанием значений выражений секционирования используются отдельным экземпляром класса UDTF.
orderBy Если задано значение непустое, это указывает порядок строк в каждой секции.
select Если задано значение "Непустое", это последовательность выражений, которые UDTF указывает для Катализатора для вычисления столбцов в входном аргументе TABLE. UDTF получает один входной атрибут для каждого имени в списке в порядке их перечисления.

В этом analyze примере возвращается один выходной столбец для каждого слова в аргументе входной строки.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

Переадресация состояния в будущие eval вызовы

Метод analyze может служить удобным местом для выполнения инициализации, а затем пересылать результаты в будущие eval вызовы метода для того же вызова UDTF.

Для этого создайте подкласс AnalyzeResult и верните экземпляр подкласса analyze из метода. Затем добавьте дополнительный аргумент в __init__ метод, чтобы принять этот экземпляр.

В этом analyze примере возвращается константная выходная схема, но добавляются пользовательские сведения в метаданные результата, которые будут использоваться при последующих __init__ вызовах метода:

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Вывод выходных строк

Метод eval выполняется один раз для каждой строки входного аргумента таблицы (или только один раз, если аргумент таблицы не указан), за которым следует один вызов terminate метода в конце. Метод выводит ноль или больше строк, которые соответствуют схеме результатов путем получения кортежей, списков или pyspark.sql.Row объектов.

В этом примере возвращается строка, предоставляя кортеж из трех элементов:

def eval(self, x, y, z):
  yield (x, y, z)

Можно также опустить скобки:

def eval(self, x, y, z):
  yield x, y, z

Добавьте конечную запятую, чтобы вернуть строку только с одним столбцом:

def eval(self, x, y, z):
  yield x,

Вы также можете получить pyspark.sql.Row объект.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

В этом примере выходные строки из terminate метода даются с помощью списка Python. Вы можете хранить состояние внутри класса из предыдущих шагов в оценке UDTF для этой цели.

def terminate(self):
  yield [self.x, self.y, self.z]

Передача скалярных аргументов в UDTF

Скалярные аргументы можно передать в UDTF в виде константных выражений, состоящих из литеральных значений или функций на основе них. Например:

SELECT * FROM udtf(42, group => upper("finance_department"));

Передача аргументов таблицы в UDTF

Определяемые пользователем функции Python могут принимать входную таблицу в качестве аргумента в дополнение к скалярным входным аргументам. Один UDTF также может принимать аргумент таблицы и несколько скалярных аргументов.

Затем любой SQL-запрос может предоставить входную таблицу с помощью TABLE ключевого слова, за которым следует скобки, окружающие соответствующий идентификатор таблицы, например TABLE(t). Кроме того, можно передать вложенный запрос таблицы, например TABLE(SELECT a, b, c FROM t) или TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

Затем входной аргумент таблицы представляется в качестве аргумента eval метода с одним вызовом pyspark.sql.Row eval метода для каждой строки в входной таблице. Для взаимодействия со столбцами в каждой строке можно использовать стандартные заметки полей столбцов PySpark. В следующем примере показано явное импортирование типа PySpark Row и фильтрация переданной id таблицы в поле:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Чтобы запросить функцию, используйте ключевое TABLE слово SQL:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Указание секционирования входных строк из вызовов функций

При вызове UDTF с аргументом таблицы любой SQL-запрос может секционирование входной таблицы по нескольким вызовам UDTF на основе значений одного или нескольких входных столбцов таблицы.

Чтобы указать секцию, используйте PARTITION BY предложение в вызове функции после аргумента TABLE . Это гарантирует, что все входные строки с каждым уникальным сочетанием значений столбцов секционирования будут использоваться ровно одним экземпляром класса UDTF.

Обратите внимание, что помимо простых ссылок PARTITION BY на столбцы предложение также принимает произвольные выражения на основе входных столбцов таблицы. Например, можно указать LENGTH строку, извлечь месяц из даты или объединить два значения.

Кроме того, можно указать WITH SINGLE PARTITION PARTITION BY вместо запроса только одну секцию, в которой все входные строки должны использоваться ровно одним экземпляром класса UDTF.

В каждой секции можно указать обязательное упорядочение входных строк в качестве метода UDTF eval . Для этого предоставьте ORDER BY предложение после описанного PARTITION BY выше предложения.WITH SINGLE PARTITION

Например, рассмотрим следующий UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Параметры секционирования можно указать при вызове UDTF по входной таблице способами:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Указание секционирования входных строк из analyze метода

Обратите внимание, что для каждого из указанных выше способов секционирования входной таблицы при вызове определяемых пользователем определяемых пользователем объектов в SQL-запросах существует соответствующий способ analyze автоматического указания одного метода секционирования.

  • Вместо вызова UDTF SELECT * FROM udtf(TABLE(t) PARTITION BY a)можно обновить analyze метод, чтобы задать поле partitionBy=[PartitioningColumn("a")] и просто вызвать функцию с помощью SELECT * FROM udtf(TABLE(t)).
  • С помощью того же токена вместо указания TABLE(t) WITH SINGLE PARTITION ORDER BY b в SQL-запросе можно задать analyze поля withSinglePartition=true , а orderBy=[OrderingColumn("b")] затем просто передать TABLE(t).
  • Вместо передачи TABLE(SELECT a FROM t) в SQL-запросе можно задать analyze select=[SelectedColumn("a")] , а затем просто передать TABLE(t).

В следующем примере analyze возвращает константную выходную схему, выбирает подмножество столбцов из входной таблицы и указывает, что входная таблица секционирована по нескольким вызовам UDTF на основе значений столбца date :

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])