UDTFs (funções de tabela definidas pelo usuário) do Python
Importante
Esse recurso está na Visualização Pública no Databricks Runtime 14.3 LTS e superior.
Uma função de tabela definida pelo usuário (UDTF) permite registrar funções que retornam tabelas em vez de valores escalares. Ao contrário das funções escalares que retornam um único valor de resultado de cada chamada, cada UDTF é invocada em uma cláusula FROM
de instrução SQL e retorna uma tabela inteira como saída.
Cada chamada UDTF pode aceitar zero ou mais argumentos. Esses argumentos podem ser expressões escalares ou argumentos de tabela que representam tabelas de entrada inteiras.
Sintaxe UDTF básica
O Apache Spark implementa UDTFs do Python como classes Python com um método eval
obrigatório que usa yield
para emitir linhas de saída.
Para usar sua classe como uma UDTF, você precisará importar a função PySpark udtf
. O Databricks recomenda usar essa função como decorador e especificar explicitamente nomes e tipos de campo com a opção returnType
(a menos que a classe defina um método analyze
conforme descrito em uma seção posterior).
A seguinte UDTF cria uma tabela usando uma lista fixa de dois argumentos inteiros:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Registrar uma UDTF
As UDTFs são registradas no SparkSession
local e isoladas no nível do notebook ou do trabalho.
Não é possível registrar UDTFs como objetos no Catálogo do Unity e elas não podem ser usadas com SQL Warehouses.
Você pode registrar uma UDTF no SparkSession
atual para uso em consultas SQL com a função spark.udtf.register()
. Forneça um nome para a função SQL e a classe UDTF do Python.
spark.udtf.register("get_sum_diff", GetSumDiff)
Chamar uma UDTF registrada
Depois de registrada, você pode usar a UDTF no SQL com o comando magic %sql
ou a função spark.sql()
:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Usar o Apache Arrow
Caso a UDTF receba uma pequena quantidade de dados como entrada, mas gera uma tabela grande, o Databricks recomenda o uso do Apache Arrow. Você pode habilitá-lo com a especificação do parâmetro useArrow
na declaração da UDTF:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Listas de argumentos de variáveis – *args e **kwargs
Você pode usar a sintaxe do Python *args
ou **kwargs
e implementar a lógica para lidar com um número indeterminado de valores de entrada.
O exemplo abaixo retorna o mesmo resultado, mas verificando explicitamente os argumentos nos tipos e no tamanho da entrada:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Aqui está o mesmo exemplo, mas usando argumentos de palavra-chave:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Definir um esquema estático no momento do registro
A UDTF retorna linhas com um esquema de saída composto por uma sequência ordenada de nomes e tipos de coluna. Se o esquema UDTF deve permanecer sempre o mesmo para todas as consultas, você pode especificar um esquema estático e fixo após o decorador @udtf
. Deve ser um StructType
:
StructType().add("c1", StringType())
Ou uma cadeia de caracteres DDL que representa um tipo de struct:
c1: string
Calcular um esquema dinâmico no momento da chamada da função
UDTFs também podem calcular o esquema de saída programaticamente para cada chamada, dependendo dos valores dos argumentos de entrada. Para fazer isso, defina um método estático chamado analyze
que aceite zero ou mais parâmetros que correspondam aos argumentos fornecidos à chamada UDTF específica.
Cada argumento do método analyze
é uma instância da classe AnalyzeArgument
que contém os seguintes campos:
Campo de classeAnalyzeArgument |
Descrição |
---|---|
dataType |
O tipo do argumento de entrada como um DataType . Para argumentos de tabela de entrada, este é um StructType que representa as colunas da tabela. |
value |
O valor do argumento de entrada como um Optional[Any] . Esse é o None para argumentos de tabela ou argumentos escalares literais que não são constantes. |
isTable |
Se o argumento de entrada é uma tabela como um BooleanType . |
isConstantExpression |
Se o argumento de entrada é uma expressão dobrável de constante como um BooleanType . |
O método analyze
retorna uma instância da classe AnalyzeResult
, que inclui o esquema da tabela de resultados como um StructType
mais alguns campos opcionais. Se a UDTF aceitar um argumento de tabela de entrada, o AnalyzeResult
também poderá incluir uma maneira solicitada de particionar e ordenar as linhas da tabela de entrada em várias chamadas UDTF, conforme descrito posteriormente.
Campo de classe AnalyzeResult |
Descrição |
---|---|
schema |
O esquema da tabela de resultados como um StructType . |
withSinglePartition |
Se todas as linhas de entrada devem ser enviadas para a mesma instância de classe UDTF como um BooleanType . |
partitionBy |
Se definido como não vazio, todas as linhas com cada combinação exclusiva de valores das expressões de particionamento são consumidas por uma instância separada da classe UDTF. |
orderBy |
Se definido como não vazio, isso especifica uma ordem de linhas dentro de cada partição. |
select |
Se definido como não vazio, essa é uma sequência de expressões que a UDTF está especificando para o Catalyst avaliar em relação às colunas no argumento TABLE de entrada. A UDTF recebe um atributo de entrada para cada nome na lista na ordem em que estão listados. |
Este exemplo de analyze
retorna uma coluna de saída para cada palavra no argumento de cadeia de caracteres de entrada.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
Encaminhar o estado para chamadas eval
futuras
O método analyze
pode servir como um local conveniente para executar a inicialização e encaminhar os resultados para futuras invocações do método eval
na mesma chamada UDTF.
Para fazer isso, crie uma subclasse de AnalyzeResult
e retorne uma instância da subclasse do método analyze
.
Em seguida, adicione um argumento ao método __init__
para aceitar essa instância.
Este exemplo de analyze
retorna um esquema de saída de constante, mas adiciona informações personalizadas nos metadados de resultados a serem consumidos por chamadas de método __init__
futuras:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Suspender linhas de saída
O método eval
é executado uma vez para cada linha do argumento de tabela de entrada (ou apenas uma vez se nenhum argumento de tabela for fornecido), seguido por uma invocação do método terminate
no final. O método gera zero ou mais linhas que estão em conformidade com o esquema de resultados ao suspender tuplas, listas ou objetos pyspark.sql.Row
.
Este exemplo retorna uma linha ao fornecer uma tupla de três elementos:
def eval(self, x, y, z):
yield (x, y, z)
Você também pode omitir os parênteses:
def eval(self, x, y, z):
yield x, y, z
Adicione uma vírgula à direita para retornar uma linha com apenas uma coluna:
def eval(self, x, y, z):
yield x,
Você também pode suspender um objeto pyspark.sql.Row
.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
Este exemplo suspende linhas de saída do método terminate
usando uma lista do Python. É possível armazenar o estado dentro da classe de etapas anteriores na avaliação UDTF para essa finalidade.
def terminate(self):
yield [self.x, self.y, self.z]
Transmitir argumentos escalares a uma UDTF
Você pode passar argumentos escalares para uma UDTF como expressões de constante que compreendem valores literais ou funções com base neles. Por exemplo:
SELECT * FROM udtf(42, group => upper("finance_department"));
Transmitir argumentos de tabela a uma UDTF
As UDTFs do Python podem aceitar uma tabela de entrada como um argumento, além de argumentos de entrada escalares. Uma única UDTF também pode aceitar um argumento de tabela e vários argumentos escalares.
Em seguida, qualquer consulta SQL pode fornecer uma tabela de entrada usando a palavra-chave TABLE
com um identificador de tabela apropriado entre parênteses, como TABLE(t)
. Como alternativa, você pode passar uma subconsulta de tabela, como TABLE(SELECT a, b, c FROM t)
ou TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
O argumento da tabela de entrada é representado como um argumento pyspark.sql.Row
para o método eval
, com uma chamada para o método eval
para cada linha na tabela de entrada. Você pode usar anotações de campo de coluna PySpark padrão para interagir com as colunas em cada linha. O exemplo a seguir demonstra a importação explícita do tipo PySpark Row
e, em seguida, a filtragem da tabela transmitida no campo id
:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Para consultar a função, use a palavra-chave SQL TABLE
:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Especificar um particionamento das linhas de entrada de chamadas de função
Ao chamar uma UDTF com um argumento de tabela, qualquer consulta SQL pode particionar a tabela de entrada em várias chamadas UDTF de acordo com os valores de uma ou mais colunas de tabela de entrada.
Para especificar uma partição, use a cláusula PARTITION BY
na chamada de função após o argumento TABLE
.
Isso garante que todas as linhas de entrada com cada combinação exclusiva de valores das colunas de particionamento serão consumidas por exatamente uma instância da classe UDTF.
Observe que, além de referências de coluna simples, a cláusula PARTITION BY
também aceita expressões arbitrárias com base em colunas de tabela de entrada. Por exemplo, você pode especificar o LENGTH
de uma cadeia de caracteres, extrair um mês de uma data ou concatenar dois valores.
Também é possível especificar WITH SINGLE PARTITION
em vez de PARTITION BY
para solicitar apenas uma partição em que todas as linhas de entrada devem ser consumidas por exatamente uma instância da classe UDTF.
Em cada partição, opcionalmente, você pode especificar uma ordem necessária das linhas de entrada à medida que o método eval
da UDTF as consome. Para fazer isso, forneça uma cláusula ORDER BY
após a cláusula PARTITION BY
ou WITH SINGLE PARTITION
descrita acima.
Por exemplo, considere a seguinte UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
Você pode especificar opções de particionamento ao chamar a UDTF por meio da tabela de entrada de várias maneiras:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Especificar um particionamento das linhas de entrada no método analyze
Observe que, para cada uma das maneiras acima de particionar a tabela de entrada ao chamar UDTFs em consultas SQL, há uma maneira correspondente para que o método analyze
da UDTF especifique automaticamente o mesmo método de particionamento.
- Em vez de chamar uma UDTF como
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
, você pode atualizar o métodoanalyze
para definir o campopartitionBy=[PartitioningColumn("a")]
e chamar a função usandoSELECT * FROM udtf(TABLE(t))
. - Pelo mesmo token, em vez de especificar
TABLE(t) WITH SINGLE PARTITION ORDER BY b
na consulta SQL, você pode fazer com queanalyze
defina os camposwithSinglePartition=true
eorderBy=[OrderingColumn("b")]
e passeTABLE(t)
. - Em vez de passar
TABLE(SELECT a FROM t)
na consulta SQL, você pode fazer com queanalyze
definaselect=[SelectedColumn("a")]
e passarTABLE(t)
.
No seguinte exemplo, analyze
retorna um esquema de saída de constante, seleciona um subconjunto de colunas na tabela de entrada e especifica que a tabela de entrada é particionada em várias chamadas UDTF com base nos valores da coluna date
:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])