Compartir vía


Funciones de tabla definidas por el usuario (UDF) de Python

Importante

Esta característica está en versión preliminar pública en Databricks Runtime 14.3 LTS y versiones posteriores.

Una función de tabla definida por el usuario (UDTF) permite registrar funciones que devuelven tablas en lugar de valores escalares. A diferencia de las funciones escalares que devuelven un único valor de resultado de cada llamada, cada UDTF se invoca en la cláusula de FROM una instrucción SQL y devuelve una tabla completa como salida.

Cada llamada UDTF puede aceptar cero o más argumentos. Estos argumentos pueden ser expresiones escalares o argumentos de tabla que representan tablas de entrada completas.

Las UDTF se pueden registrar de dos maneras:

Sugerencia

Databricks recomienda registrar UDF en el Catálogo de Unity para aprovechar las ventajas de la gobernanza centralizada que facilita compartir y reutilizar funciones de forma segura entre usuarios y equipos.

Sintaxis UDTF básica

Apache Spark implementa las UDF de Python como clases de Python con un método obligatorio eval que usa yield para emitir filas de salida.

Para usar la clase como UDTF, debe importar la función PySpark udtf . Databricks recomienda usar esta función como decorador y especificar explícitamente nombres de campo y tipos mediante la returnType opción (a menos que la clase defina un analyze método como se describe en una sección posterior).

El siguiente UDTF crea una tabla con una lista fija de dos argumentos enteros:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Registrar un UDTF

Para registrar un UDTF con ámbito de sesión para su uso en consultas SQL, use spark.udtf.register(). Proporcione un nombre para la función SQL y la clase UDTF de Python.

spark.udtf.register("get_sum_diff", GetSumDiff)

Llamar a un UDTF registrado

Una vez registrado, puede usar el UDTF en SQL mediante el %sql comando mágico o la spark.sql() función.

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);").show()
%sql
SELECT * FROM get_sum_diff(1,2);

Actualización de un UDTF con ámbito de sesión al catálogo de Unity

Importante

El registro de UDTF de Python en el catálogo de Unity está en versión preliminar pública. Las UDF del catálogo de Unity requieren Databricks Runtime versión 17.1 y posteriores. Vea Requisitos.

Puede actualizar un UDTF con ámbito de sesión al catálogo de Unity para aprovechar la gobernanza centralizada y facilitar el uso seguro de las funciones de uso compartido y reutilización entre usuarios y equipos.

Para actualizar un UDTF con ámbito de sesión al catálogo de Unity, use DDL de SQL con la CREATE OR REPLACE FUNCTION instrucción . En el ejemplo siguiente se muestra cómo convertir el GetSumDiff UDTF de una función con ámbito de sesión a una función de catálogo de Unity:

CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y
$$;

SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13  | 7    |
+-----+------+

Para obtener más información sobre las UDF del catálogo de Unity, consulte Funciones de tabla definidas por el usuario (UDF) de Python en el catálogo de Unity.

Uso de Apache Arrow

Si el UDTF recibe una pequeña cantidad de datos como entrada, pero genera una tabla grande, Databricks recomienda usar Apache Arrow. Puede habilitarlo especificando el useArrow parámetro al declarar el UDTF:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Listas de argumentos variables: *args y **kwargs

Puede usar Python *args o **kwargs sintaxis e implementar lógica para controlar un número no especificado de valores de entrada.

En el ejemplo siguiente se devuelve el mismo resultado al comprobar explícitamente la longitud de entrada y los tipos de los argumentos:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Este es el mismo ejemplo, pero el uso de argumentos de palabra clave:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Definición de un esquema estático en el momento del registro

El UDTF devuelve filas con un esquema de salida que comprende una secuencia ordenada de tipos y nombres de columna. Si el esquema UDTF siempre debe permanecer igual para todas las consultas, puede especificar un esquema fijo estático después del @udtf decorador. Debe ser :StructType

StructType().add("c1", StringType())

O una cadena DDL que representa un tipo de estructura:

c1: string

Calcular un esquema dinámico al momento de la llamada de la función

Las UDF también pueden calcular el esquema de salida mediante programación para cada llamada en función de los valores de los argumentos de entrada. Para ello, defina un método estático denominado analyze que acepta cero o más parámetros que corresponden a los argumentos proporcionados a la llamada UDTF específica.

Cada argumento del analyze método es una instancia de la AnalyzeArgument clase que contiene los siguientes campos:

AnalyzeArgument Campo de clase Descripción
dataType Tipo del argumento de entrada como DataType. En el caso de los argumentos de la tabla de entrada, se trata de un objeto StructType que representa las columnas de la tabla.
value El valor del argumento de entrada como un Optional[Any]. Esto es None para argumentos de tabla o argumentos escalares literales que no son constantes.
isTable Si el argumento de entrada es una tabla como .BooleanType
isConstantExpression Si el argumento de entrada es una expresión plegable constante como .BooleanType

El método analyze devuelve una instancia de la clase AnalyzeResult, que incluye el esquema de la tabla de resultados como un StructType, además de algunos campos opcionales. Si el UDTF acepta un argumento de tabla de entrada, AnalyzeResult también puede incluir una manera solicitada de particionar y ordenar las filas de la tabla de entrada en varias llamadas UDTF, como se describe más adelante.

AnalyzeResult Campo de clase Descripción
schema Esquema de la tabla de resultados como .StructType
withSinglePartition Indicar si se deben enviar todas las filas de entrada a la misma instancia de clase UDTF como un BooleanType.
partitionBy Si se establece como no vacío, todas las filas con cada combinación única de valores de las expresiones de partición son consumidas por instancias separadas de la clase UDTF.
orderBy Si se establece en no vacío, especifica una ordenación de filas dentro de cada partición.
select Si se establece un valor no vacío, se trata de una secuencia de expresiones que el UDTF especifica para que Catalyst evalúe contra las columnas en el argumento de entrada TABLE. El UDTF recibe un atributo de entrada para cada nombre de la lista en el orden en que se enumeran.

En este ejemplo estándar analyze devuelve una columna de salida para cada palabra del argumento de cadena de entrada.

from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult


@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result

MyUDTF(lit("hello world")).columns
['word_0', 'word_1']

Reenviar el estado a las llamadas futuras eval

El analyze método puede servir como un lugar conveniente para realizar la inicialización y, a continuación, reenviar los resultados a futuras invocaciones de eval método para la misma llamada UDTF.

Para ello, cree una subclase de AnalyzeResult y devuelva una instancia de la subclase desde el analyze método . A continuación, agregue un argumento adicional al __init__ método para aceptar esa instancia.

En este ejemplo, se devuelve un esquema de salida constante, pero se agrega información personalizada en los metadatos del resultado para que las llamadas futuras del método la consuman.

from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Generar filas de salida

El eval método se ejecuta una vez para cada fila del argumento de tabla de entrada (o simplemente una vez si no se proporciona ningún argumento de tabla), seguido de una invocación del terminate método al final. Ambos métodos generan cero o más filas que se ajustan al esquema de resultados al producir tuplas, listas o pyspark.sql.Row objetos.

En este ejemplo se devuelve una fila proporcionando una tupla de tres elementos:

def eval(self, x, y, z):
  yield (x, y, z)

También puede omitir los paréntesis:

def eval(self, x, y, z):
  yield x, y, z

Agregue una coma final para devolver una fila con una sola columna:

def eval(self, x, y, z):
  yield x,

También puede producir un pyspark.sql.Row objeto .

def eval(self, x, y, z):
  from pyspark.sql.types import Row
  yield Row(x, y, z)

En este ejemplo se generan filas de salida del terminate método mediante una lista de Python. Puede almacenar el estado dentro de la clase desde los pasos anteriores de la evaluación UDTF para este propósito.

def terminate(self):
  yield [self.x, self.y, self.z]

Pasar argumentos escalares a un UDTF

Puede pasar argumentos escalares a un UDTF como expresiones constantes que comprenden valores literales o funciones en función de ellos. Por ejemplo:

SELECT * FROM get_sum_diff(1, y => 2)

Transferir argumentos de una tabla a un UDTF

Las UDF de Python pueden aceptar una tabla de entrada como argumento además de argumentos de entrada escalares. Un único UDTF también puede aceptar un argumento de tabla y varios argumentos escalares.

A continuación, cualquier consulta SQL puede proporcionar una tabla de entrada mediante la TABLE palabra clave seguida de paréntesis que rodea un identificador de tabla adecuado, como TABLE(t). Como alternativa, puede pasar una subconsulta de tabla, como TABLE(SELECT a, b, c FROM t) o TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

A continuación, el argumento de tabla de entrada se representa como argumento pyspark.sql.Row para el eval método , con una llamada al eval método para cada fila de la tabla de entrada. Puede usar anotaciones de campo de columna PySpark estándar para interactuar con las columnas en cada fila. En el ejemplo siguiente se muestra cómo importar explícitamente el tipo PySpark Row y, a continuación, filtrar la tabla pasada en el id campo:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Para consultar la función, use la TABLE palabra clave SQL:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Especificar un particionamiento de las filas de entrada a partir de las llamadas de función

Al llamar a un UDTF con un argumento de tabla, cualquier consulta SQL puede particionar la tabla de entrada en varias llamadas UDTF basadas en los valores de una o varias columnas de tabla de entrada.

Para especificar una partición, use la PARTITION BY cláusula en la llamada de función después del TABLE argumento . Esto garantiza que todas las filas de entrada con cada combinación única de valores de las columnas de partición serán consumidas por exactamente una instancia de la clase UDTF.

Tenga en cuenta que, además de las referencias de columna simples, la PARTITION BY cláusula también acepta expresiones arbitrarias basadas en columnas de tabla de entrada. Por ejemplo, puede especificar el LENGTH de una cadena, extraer un mes de una fecha o concatenar dos valores.

También es posible especificar WITH SINGLE PARTITION en lugar de PARTITION BY para solicitar solo una partición en la que todas las filas de entrada deben ser consumidas por exactamente una instancia de la clase UDTF.

Dentro de cada partición, puede especificar opcionalmente una ordenación necesaria de las filas de entrada a medida que el método de eval UDTF los consume. Para ello, proporcione una cláusula ORDER BY después de la PARTITION BY o WITH SINGLE PARTITION descrita anteriormente.

Por ejemplo, considere la siguiente UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Puede especificar opciones de creación de particiones al llamar al UDTF a través de la tabla de entrada de varias maneras:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8);
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Especifique una partición de las filas de entrada del método analyze

Tenga en cuenta que, para cada una de las formas anteriores de crear particiones de la tabla de entrada al llamar a UDF en consultas SQL, hay una manera correspondiente para que el método de analyze UDTF especifique automáticamente el mismo método de creación de particiones.

  • En lugar de llamar a un UDTF como SELECT * FROM udtf(TABLE(t) PARTITION BY a), puede actualizar el analyze método para establecer el campo partitionBy=[PartitioningColumn("a")] y simplemente llamar a la función mediante SELECT * FROM udtf(TABLE(t)).
  • De igual modo, en lugar de especificar TABLE(t) WITH SINGLE PARTITION ORDER BY b en la consulta SQL, puede hacer que analyze establezca los campos withSinglePartition=true y orderBy=[OrderingColumn("b")], y luego simplemente pasar TABLE(t).
  • En lugar de pasar TABLE(SELECT a FROM t) en la consulta SQL, puede hacer que analyze establezca select=[SelectedColumn("a")] y, a continuación, simplemente pasar TABLE(t).

En el ejemplo siguiente, analyze devuelve un esquema de salida constante, selecciona un subconjunto de columnas de la tabla de entrada y especifica que la tabla de entrada se particiona en varias llamadas UDTF en función de los valores de la date columna:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add("longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word)",
        alias="length_word")])