Определяемые пользователем функции таблиц Python (определяемые пользователем функции)
Внимание
Эта функция доступна в общедоступной предварительной версии в Databricks Runtime 14.3 LTS и выше.
Определяемая пользователем функция таблицы (UDTF) позволяет регистрировать функции, возвращающие таблицы вместо скалярных значений. В отличие от скалярных функций, возвращающих одно результирующие значения из каждого вызова, каждый UDTF вызывается в предложении инструкции FROM
SQL и возвращает всю таблицу в виде выходных данных.
Каждый вызов UDTF может принимать ноль или больше аргументов. Эти аргументы могут быть скалярными выражениями или аргументами таблицы, представляющими всю входную таблицу.
Базовый синтаксис UDTF
Apache Spark реализует UDTF Python в качестве классов Python с обязательным eval
методом, который используется yield
для выдачи выходных строк.
Чтобы использовать класс в качестве UDTF, необходимо импортировать функцию PySpark udtf
. Databricks рекомендует использовать эту функцию в качестве декоратора и явно указывать имена полей и типы с помощью returnType
параметра (если класс не определяет analyze
метод, как описано в следующем разделе).
Следующий UDTF создает таблицу с помощью фиксированного списка двух целых аргументов:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Регистрация UDTF
Определяемые пользователем функции регистрируются локально SparkSession
и изолированы на уровне записной книжки или задания.
Не удается зарегистрировать определяемые пользователем функции в качестве объектов в каталоге Unity, а определяемые пользователем объекты нельзя использовать с хранилищами SQL.
Вы можете зарегистрировать UDTF в текущий SparkSession
для использования в sql-запросах с функцией spark.udtf.register()
. Укажите имя функции SQL и класса UDTF Python.
spark.udtf.register("get_sum_diff", GetSumDiff)
Вызов зарегистрированного UDTF
После регистрации вы можете использовать UDTF в SQL с помощью волшебной %sql
команды или spark.sql()
функции:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Использование Apache Arrow
Если UDTF получает небольшой объем данных в качестве входных данных, но выводит большую таблицу, Databricks рекомендует использовать Apache Arrow. Его можно включить, указав useArrow
параметр при объявлении UDTF:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Списки аргументов переменной — *args и **kwargs
Вы можете использовать Python *args
или **kwargs
синтаксис и реализовать логику для обработки неопределенного количества входных значений.
В следующем примере возвращается тот же результат при явной проверке длины входных данных и типов аргументов:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Ниже приведен тот же пример, но использование аргументов ключевых слов:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Определение статической схемы во время регистрации
UDTF возвращает строки с выходной схемой, содержащей упорядоченную последовательность имен столбцов и типов. Если схема UDTF всегда должна оставаться одинаковой для всех запросов, можно указать статическую фиксированную схему после @udtf
декоратора. Оно должно быть следующим:StructType
StructType().add("c1", StringType())
Или строка DDL, представляющая тип структуры:
c1: string
Вычисление динамической схемы во время вызова функции
Определяемые пользователем функции также могут программно вычислять схему вывода для каждого вызова в зависимости от значений входных аргументов. Для этого определите статический метод, который analyze
принимает ноль или больше параметров, соответствующих аргументам, предоставленным конкретному вызову UDTF.
Каждый аргумент analyze
метода является экземпляром AnalyzeArgument
класса, содержащего следующие поля:
AnalyzeArgument поле класса |
Description |
---|---|
dataType |
Тип входного аргумента в виде DataType . Для аргументов входной таблицы это StructType столбцы таблицы. |
value |
Значение входного аргумента в качестве Optional[Any] значения. Это касается None аргументов таблицы или скалярных аргументов, которые не являются константами. |
isTable |
Указывает, является ли входной аргумент таблицей BooleanType . |
isConstantExpression |
Указывает, является ли входной аргумент константным свертываемым выражением BooleanType . |
Метод analyze
возвращает экземпляр AnalyzeResult
класса, который включает схему таблицы результатов в виде StructType
дополнительных полей. Если UDTF принимает аргумент входной таблицы, AnalyzeResult
можно также включить запрошенный способ секционирования и упорядочивания строк входной таблицы в нескольких вызовах UDTF, как описано ниже.
AnalyzeResult поле класса |
Description |
---|---|
schema |
Схема результирующих таблиц в виде StructType . |
withSinglePartition |
Следует ли отправлять все входные строки в один и тот же экземпляр класса UDTF в качестве экземпляра BooleanType . |
partitionBy |
Если задано значение непустое, все строки с каждым уникальным сочетанием значений выражений секционирования используются отдельным экземпляром класса UDTF. |
orderBy |
Если задано значение непустое, это указывает порядок строк в каждой секции. |
select |
Если задано значение "Непустое", это последовательность выражений, которые UDTF указывает для Катализатора для вычисления столбцов в входном аргументе TABLE. UDTF получает один входной атрибут для каждого имени в списке в порядке их перечисления. |
В этом analyze
примере возвращается один выходной столбец для каждого слова в аргументе входной строки.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
Переадресация состояния в будущие eval
вызовы
Метод analyze
может служить удобным местом для выполнения инициализации, а затем пересылать результаты в будущие eval
вызовы метода для того же вызова UDTF.
Для этого создайте подкласс AnalyzeResult
и верните экземпляр подкласса analyze
из метода.
Затем добавьте дополнительный аргумент в __init__
метод, чтобы принять этот экземпляр.
В этом analyze
примере возвращается константная выходная схема, но добавляются пользовательские сведения в метаданные результата, которые будут использоваться при последующих __init__
вызовах метода:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Вывод выходных строк
Метод eval
выполняется один раз для каждой строки входного аргумента таблицы (или только один раз, если аргумент таблицы не указан), за которым следует один вызов terminate
метода в конце. Метод выводит ноль или больше строк, которые соответствуют схеме результатов путем получения кортежей, списков или pyspark.sql.Row
объектов.
В этом примере возвращается строка, предоставляя кортеж из трех элементов:
def eval(self, x, y, z):
yield (x, y, z)
Можно также опустить скобки:
def eval(self, x, y, z):
yield x, y, z
Добавьте конечную запятую, чтобы вернуть строку только с одним столбцом:
def eval(self, x, y, z):
yield x,
Вы также можете получить pyspark.sql.Row
объект.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
В этом примере выходные строки из terminate
метода даются с помощью списка Python. Вы можете хранить состояние внутри класса из предыдущих шагов в оценке UDTF для этой цели.
def terminate(self):
yield [self.x, self.y, self.z]
Передача скалярных аргументов в UDTF
Скалярные аргументы можно передать в UDTF в виде константных выражений, состоящих из литеральных значений или функций на основе них. Например:
SELECT * FROM udtf(42, group => upper("finance_department"));
Передача аргументов таблицы в UDTF
Определяемые пользователем функции Python могут принимать входную таблицу в качестве аргумента в дополнение к скалярным входным аргументам. Один UDTF также может принимать аргумент таблицы и несколько скалярных аргументов.
Затем любой SQL-запрос может предоставить входную таблицу с помощью TABLE
ключевого слова, за которым следует скобки, окружающие соответствующий идентификатор таблицы, например TABLE(t)
. Кроме того, можно передать вложенный запрос таблицы, например TABLE(SELECT a, b, c FROM t)
или TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
Затем входной аргумент таблицы представляется в качестве аргумента eval
метода с одним вызовом pyspark.sql.Row
eval
метода для каждой строки в входной таблице. Для взаимодействия со столбцами в каждой строке можно использовать стандартные заметки полей столбцов PySpark. В следующем примере показано явное импортирование типа PySpark Row
и фильтрация переданной id
таблицы в поле:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Чтобы запросить функцию, используйте ключевое TABLE
слово SQL:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Указание секционирования входных строк из вызовов функций
При вызове UDTF с аргументом таблицы любой SQL-запрос может секционирование входной таблицы по нескольким вызовам UDTF на основе значений одного или нескольких входных столбцов таблицы.
Чтобы указать секцию, используйте PARTITION BY
предложение в вызове функции после аргумента TABLE
.
Это гарантирует, что все входные строки с каждым уникальным сочетанием значений столбцов секционирования будут использоваться ровно одним экземпляром класса UDTF.
Обратите внимание, что помимо простых ссылок PARTITION BY
на столбцы предложение также принимает произвольные выражения на основе входных столбцов таблицы. Например, можно указать LENGTH
строку, извлечь месяц из даты или объединить два значения.
Кроме того, можно указать WITH SINGLE PARTITION
PARTITION BY
вместо запроса только одну секцию, в которой все входные строки должны использоваться ровно одним экземпляром класса UDTF.
В каждой секции можно указать обязательное упорядочение входных строк в качестве метода UDTF eval
. Для этого предоставьте ORDER BY
предложение после описанного PARTITION BY
выше предложения.WITH SINGLE PARTITION
Например, рассмотрим следующий UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
Параметры секционирования можно указать при вызове UDTF по входной таблице способами:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Указание секционирования входных строк из analyze
метода
Обратите внимание, что для каждого из указанных выше способов секционирования входной таблицы при вызове определяемых пользователем определяемых пользователем объектов в SQL-запросах существует соответствующий способ analyze
автоматического указания одного метода секционирования.
- Вместо вызова UDTF
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
можно обновитьanalyze
метод, чтобы задать полеpartitionBy=[PartitioningColumn("a")]
и просто вызвать функцию с помощьюSELECT * FROM udtf(TABLE(t))
. - С помощью того же токена вместо указания
TABLE(t) WITH SINGLE PARTITION ORDER BY b
в SQL-запросе можно задатьanalyze
поляwithSinglePartition=true
, аorderBy=[OrderingColumn("b")]
затем просто передатьTABLE(t)
. - Вместо передачи
TABLE(SELECT a FROM t)
в SQL-запросе можно задатьanalyze
select=[SelectedColumn("a")]
, а затем просто передатьTABLE(t)
.
В следующем примере analyze
возвращает константную выходную схему, выбирает подмножество столбцов из входной таблицы и указывает, что входная таблица секционирована по нескольким вызовам UDTF на основе значений столбца date
:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])