Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Важный
Эта функция находится в общедоступной предварительной версии в Databricks Runtime 14.3 LTS и выше.
Определяемая пользователем функция таблицы (UDTF) позволяет регистрировать функции, возвращающие таблицы вместо скалярных значений. В отличие от скалярных функций, возвращающих одно результирующие значения из каждого вызова, каждый UDTF вызывается в предложении инструкции FROM SQL и возвращает всю таблицу в виде выходных данных.
Каждый вызов UDTF может принимать ноль или больше аргументов. Эти аргументы могут быть скалярными выражениями или аргументами таблицы, представляющими всю входную таблицу.
Определяемые пользователем функции можно зарегистрировать двумя способами:
- Каталог Unity: зарегистрируйте UDTF в качестве управляемого объекта в каталоге Unity. См. определяемые пользователем функции таблиц Python (ОПФТ) в каталоге Unity.
- Область сеанса: зарегистрируйтесь в локальном
SparkSession, изолированном для текущей записной книжки или задания.
Подсказка
Databricks рекомендует зарегистрировать определяемые пользователем функции в каталоге Unity, чтобы воспользоваться преимуществами централизованного управления, что упрощает безопасное совместное использование функций для пользователей и команд.
Базовый синтаксис UDTF
Apache Spark реализует UDTF Python в виде классов Python с обязательным методом eval, использующим yield для вывода выходных строк.
Чтобы использовать класс в качестве UDTF, необходимо импортировать функцию PySpark udtf. Databricks рекомендует использовать эту функцию в качестве декоратора и явно указывать имена полей и типы с помощью параметра returnType (если класс не определяет метод analyze, как описано в следующем разделе).
Следующий UDTF создает таблицу с помощью фиксированного списка двух целых аргументов:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Регистрация UDTF
Чтобы зарегистрировать определяемый сеансом UDTF для использования в запросах SQL, используйте spark.udtf.register(). Укажите имя функции SQL и класса UDTF Python.
spark.udtf.register("get_sum_diff", GetSumDiff)
Вызов зарегистрированного UDTF
После регистрации вы можете использовать UDTF в SQL с помощью волшебной команды %sql или функции spark.sql():
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);").show()
%sql
SELECT * FROM get_sum_diff(1,2);
Обновление определяемого сеансом UDTF до каталога Unity
Важный
Регистрация определяемых пользователем табличных функций Python в Unity Catalog доступна в стадии общественного предварительного просмотра. Для UDTF каталога Unity требуется среда выполнения Databricks версии 17.1 и более поздней. См. раздел Требования.
Вы можете обновить UDTF с областью сеанса до каталога Unity, чтобы воспользоваться преимуществами централизованного управления и упростить безопасное совместное использование функций между пользователями и командами.
Чтобы обновить определяемый сеансом UDTF до каталога Unity, используйте DDL SQL с инструкцией CREATE OR REPLACE FUNCTION . В следующем примере показано, как преобразовать GetSumDiff UDTF из функции с областью сеанса в функцию каталога Unity:
CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
$$;
SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13 | 7 |
+-----+------+
Дополнительные сведения об определяемых пользователем функциях каталога Unity см. в описании пользовательских функций таблиц Python в каталоге Unity.
Используйте Apache Arrow
Если UDTF получает небольшой объем данных в качестве входных данных, но выводит большую таблицу, Databricks рекомендует использовать Apache Arrow. Его можно включить, указав параметр useArrow при объявлении UDTF:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Списки аргументов переменной — *args и **kwargs
Вы можете использовать синтаксис Python *args или **kwargs и реализовать логику для обработки неопределенного количества входных значений.
В следующем примере возвращается тот же результат при явной проверке длины входных данных и типов аргументов:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Ниже приведен тот же пример, но использование аргументов ключевых слов:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Определение статической схемы во время регистрации
UDTF возвращает строки с выходной схемой, содержащей упорядоченную последовательность имен столбцов и типов. Если схема UDTF всегда должна оставаться одинаковой для всех запросов, можно указать статическую фиксированную схему после декоратора @udtf. Он должен быть StructType:
StructType().add("c1", StringType())
Или строка DDL, представляющая тип структуры:
c1: string
Вычисление динамической схемы во время вызова функции
Функции UDTF также могут вычислять схему вывода программным путем для каждого вызова в зависимости от значений входных аргументов. Для этого определите статический метод с именем analyze, который принимает ноль или больше параметров, которые соответствуют аргументам, предоставленным конкретному вызову UDTF.
Каждый аргумент метода analyze является экземпляром класса AnalyzeArgument, содержащего следующие поля:
поле класса AnalyzeArgument |
Описание |
|---|---|
dataType |
Тип входного аргумента в виде DataType. Для аргументов входной таблицы это StructType, представляющий столбцы таблицы. |
value |
Значение входного аргумента в виде Optional[Any]. Это None для аргументов таблицы или литеральных скалярных аргументов, которые не являются константами. |
isTable |
Указывает, является ли входной аргумент таблицей в виде BooleanType. |
isConstantExpression |
Указывает, является ли входной аргумент константным свертываемым выражением в виде BooleanType. |
Метод analyze возвращает экземпляр класса AnalyzeResult, который включает схему таблицы результатов в виде StructType, плюс некоторые дополнительные поля. Если UDTF принимает аргумент входной таблицы, то AnalyzeResult также может включать запрошенный способ секционирования и упорядочивания строк входной таблицы в нескольких вызовах UDTF, как описано ниже.
поле класса AnalyzeResult |
Описание |
|---|---|
schema |
Схема таблицы результатов в виде StructType. |
withSinglePartition |
Следует ли отправлять все входные строки в один и тот же экземпляр класса UDTF, что и BooleanType. |
partitionBy |
Если это значение не пустое, то все строки с каждым уникальным сочетанием значений выражений для секционирования используются отдельным экземпляром класса UDTF. |
orderBy |
Если задано непустое значение, это указывает порядок строк в каждом разделе. |
select |
Если установлено как "непустое", это последовательность выражений, которые UDTF указывает для Catalyst, чтобы вычислить против столбцов во входном аргументе TABLE. UDTF получает один входной атрибут для каждого имени в списке в порядке их перечисления. |
В этом analyze примере возвращается один выходной столбец для каждого слова в аргументе входной строки.
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
MyUDTF(lit("hello world")).columns
['word_0', 'word_1']
Переадресация состояния в будущие вызовы eval
Метод analyze может служить удобным местом для выполнения инициализации, а затем пересылать результаты в будущие вызовы метода eval для того же вызова UDTF.
Для этого создайте подкласс AnalyzeResult и верните экземпляр подкласса из метода analyze.
Затем добавьте дополнительный аргумент в метод __init__, чтобы принять этот экземпляр.
Этот analyze пример возвращает константную выходную схему, но добавляет пользовательские сведения в метаданные результатов, которые будут использоваться будущими вызовами методов __init__:
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Генерация строк вывода
Метод eval выполняется один раз для каждой строки входного аргумента таблицы (или только один раз, если аргумент таблицы не указан), за которым следует один вызов метода terminate в конце. Любой из методов выводит ноль или больше строк, которые соответствуют схеме результатов, генерируя кортежи, списки или объекты типа pyspark.sql.Row.
В этом примере возвращается строка путем предоставления кортежа из трех элементов.
def eval(self, x, y, z):
yield (x, y, z)
Можно также опустить скобки:
def eval(self, x, y, z):
yield x, y, z
Добавьте конечную запятую, чтобы вернуть строку только с одним столбцом:
def eval(self, x, y, z):
yield x,
Вы также можете получить объект pyspark.sql.Row.
def eval(self, x, y, z):
from pyspark.sql.types import Row
yield Row(x, y, z)
В этом примере метод terminate генерирует выходные строки с использованием списка Python. Вы можете для этой цели хранить состояние внутри класса из предыдущих шагов при оценке UDTF.
def terminate(self):
yield [self.x, self.y, self.z]
Передавайте скалярные аргументы в UDTF
Скалярные аргументы можно передать в UDTF в виде константных выражений, состоящих из литеральных значений или функций на основе них. Например:
SELECT * FROM get_sum_diff(1, y => 2)
Передача аргументов таблицы в UDTF
Определяемые пользователем табличные функции (UDTFs) в Python могут принимать входную таблицу в качестве аргумента в дополнение к скалярным входным аргументам. Один UDTF также может принимать аргумент таблицы и несколько скалярных аргументов.
Затем любой ЗАПРОС SQL может предоставить входную таблицу с помощью ключевого слова TABLE, за которым следует скобки, окружающие соответствующий идентификатор таблицы, например TABLE(t). Кроме того, можно передать подзапрос таблицы, например TABLE(SELECT a, b, c FROM t) или TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).
Затем входной аргумент таблицы представляется в виде аргумента pyspark.sql.Row метода eval с одним вызовом метода eval для каждой строки в входной таблице. Для взаимодействия со столбцами в каждой строке можно использовать стандартные заметки полей столбцов PySpark. В следующем примере показано явное импортирование типа PySpark Row, а затем фильтрация переданной таблицы в поле id:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Чтобы запросить функцию, используйте ключевое слово TABLE SQL:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Укажите метод секционирования входных строк, полученных из вызовов функций.
При вызове UDTF с аргументом таблицы любой SQL-запрос может выполнять секционирование входной таблицы в нескольких вызовах UDTF на основе значений одного или нескольких столбцов входной таблицы.
Чтобы указать секцию, используйте предложение PARTITION BY в вызове функции после аргумента TABLE.
Это гарантирует, что все входные строки с каждым уникальным сочетанием значений столбцов секционирования будут использоваться ровно одним экземпляром класса UDTF.
Обратите внимание, что помимо простых ссылок на столбцы, предложение PARTITION BY также принимает произвольные выражения на основе входных столбцов таблицы. Например, можно указать LENGTH строки, извлечь месяц из даты или объединить два значения.
Кроме того, можно указать WITH SINGLE PARTITION вместо PARTITION BY, чтобы запрашивать только один раздел, в котором все входные строки должны быть обработаны ровно одним экземпляром класса UDTF.
В каждой секции, по желанию, можно указать требуемый порядок входных строк для обработки методом UDTF eval. Для этого предоставьте предложение ORDER BY после предложения PARTITION BY или WITH SINGLE PARTITION, описанного выше.
Например, рассмотрим следующий UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
Параметры секционирования можно указать при вызове UDTF через входную таблицу несколькими способами:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8);
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Укажите секционирование входных строк из метода analyze
Обратите внимание, что для каждого из указанных выше способов секционирования входной таблицы при вызове функций, определяемых пользователем, в SQL-запросах существует соответствующий способ для метода analyze UDTF автоматически указать тот же метод секционирования.
- Вместо вызова UDTF как
SELECT * FROM udtf(TABLE(t) PARTITION BY a)можно обновить методanalyze, чтобы задать полеpartitionBy=[PartitioningColumn("a")]и просто вызвать функцию с помощьюSELECT * FROM udtf(TABLE(t)). - Тем же образом, вместо указания
TABLE(t) WITH SINGLE PARTITION ORDER BY bв SQL-запросе, можно настроитьanalyzeтак, чтобы он задавал поляwithSinglePartition=trueиorderBy=[OrderingColumn("b")], а затем просто передатьTABLE(t). - Вместо передачи
TABLE(SELECT a FROM t)в SQL-запросе, можно сделать так, чтобыanalyzeустановилselect=[SelectedColumn("a")], а затем просто передатьTABLE(t).
В следующем примере analyze возвращает константную выходную схему, выбирает подмножество столбцов из входной таблицы и указывает, что входная таблица секционирована по нескольким вызовам UDTF на основе значений столбца date:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add("longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word)",
alias="length_word")])