Partager via


Fonctions de table définies par l’utilisateur Python (UDF)

Important

Cette fonctionnalité est disponible en préversion publique dans Databricks Runtime 14.3 LTS et versions ultérieures.

Une fonction de table définie par l’utilisateur (UDTF) vous permet d’inscrire des fonctions qui retournent des tables au lieu de valeurs scalaires. Contrairement aux fonctions scalaires qui retournent une valeur de résultat unique à partir de chaque appel, chaque UDTF est appelé dans la clause d’une FROM instruction SQL et retourne une table entière en tant que sortie.

Chaque appel UDTF peut accepter zéro ou plusieurs arguments. Ces arguments peuvent être des expressions scalaires ou des arguments de table représentant des tables d’entrée entières.

Syntaxe UDTF de base

Apache Spark implémente des UDTF en Python en tant que classes Python avec une méthode eval obligatoire qui utilise yield pour émettre des lignes de sortie.

Pour utiliser votre classe en tant qu’UDTF, vous devez importer la fonction PySpark udtf . Databricks recommande d’utiliser cette fonction comme décorateur et de spécifier explicitement les noms et les types de champs à l’aide de l’option returnType (sauf si la classe définit une analyze méthode comme décrit dans une section ultérieure).

L’UDTF suivante crée une table à l’aide d’une liste fixe de deux arguments entiers :

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Inscrire un UDTF

Les UDTF sont enregistrées pour la version locale SparkSession et sont isolées au niveau du notebook ou de la tâche.

Vous ne pouvez pas inscrire des UDTFs en tant qu’objets dans le catalogue Unity, et les UDTFs ne peuvent pas être utilisées avec des entrepôts SQL.

Vous pouvez inscrire un UDTF à l’état actuel SparkSession pour une utilisation dans les requêtes SQL avec la fonction spark.udtf.register(). Fournissez un nom pour la fonction SQL et la classe UDTF Python.

spark.udtf.register("get_sum_diff", GetSumDiff)

Appeler un UDTF enregistré

Une fois enregistré, vous pouvez utiliser l’UDTF dans SQL en utilisant soit la commande magique %sql soit la fonction spark.sql().

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);").show()
%sql
SELECT * FROM get_sum_diff(1,2);

Utiliser Apache Arrow

Si votre UDTF reçoit une petite quantité de données en entrée, mais génère une table volumineuse, Databricks recommande d’utiliser Apache Arrow. Vous pouvez l’activer en spécifiant le paramètre useArrow lors de la déclaration de l’UDTF :

@udtf(returnType="c1: int, c2: int", useArrow=True)

Listes d’arguments variables - *args et **kwargs

Vous pouvez utiliser Python *args ou **kwargs la syntaxe et implémenter la logique pour gérer un nombre non spécifié de valeurs d’entrée.

L’exemple suivant retourne le même résultat tout en vérifiant explicitement la longueur et les types d’entrée pour les arguments :

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Voici le même exemple, mais en utilisant des arguments de mot clé :

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Définir un schéma statique au moment de l’inscription

L’UDTF retourne des lignes avec un schéma de sortie comprenant une séquence ordonnée de noms et de types de colonnes. Si le schéma UDTF doit toujours rester le même pour toutes les requêtes, vous pouvez spécifier un schéma statique et fixe après le @udtf décorateur. Il doit s'agir soit d'un StructType:

StructType().add("c1", StringType())

Ou une chaîne DDL représentant un type de struct :

c1: string

Calcul d’un schéma dynamique au moment de l’appel de fonction

Les FDTDs peuvent également déterminer le schéma de sortie de manière programmatique pour chaque appel, selon les valeurs des arguments d’entrée. Pour ce faire, définissez une méthode statique appelée analyze qui accepte zéro ou plusieurs paramètres correspondant aux arguments fournis à l’appel UDTF spécifique.

Chaque argument de la analyze méthode est une instance de la AnalyzeArgument classe qui contient les champs suivants :

AnalyzeArgument champ de classe Descriptif
dataType Type de l’argument d’entrée en tant que DataType. Pour les arguments de table d’entrée, il s’agit d’un StructType représentant les colonnes de la table.
value Valeur de l’argument d’entrée en tant que Optional[Any]. Il s’agit None des arguments de table ou des arguments scalaires littéraux qui ne sont pas constants.
isTable Indique si l’argument d’entrée est une table en tant que BooleanType.
isConstantExpression Indique si l’argument d’entrée est une expression pliable constante en tant que BooleanType.

La méthode analyze retourne une instance de la classe AnalyzeResult, qui inclut le schéma de la table de résultats en tant que StructType ainsi que quelques champs facultatifs. Si l’UDTF accepte un argument de table d’entrée, il AnalyzeResult peut également inclure une méthode demandée pour partitionner et classer les lignes de la table d’entrée sur plusieurs appels UDTF, comme décrit plus loin.

AnalyzeResult champ de classe Descriptif
schema Le schéma de la table de résultats comme un StructType.
withSinglePartition Indique s’il faut envoyer toutes les lignes d’entrée à la même instance de classe UDTF qu’un BooleanType.
partitionBy Si elle est définie comme non vide ou nul, toutes les lignes avec chaque combinaison unique de valeurs des expressions de partitionnement sont traitées par une instance distincte de la classe UDTF.
orderBy S’il est défini sur non vide, cela spécifie un classement des lignes dans chaque partition.
select S'il est défini sur non vide, il s'agit d'une série d'expressions que l'UDTF spécifie pour que Catalyst évalue par rapport aux colonnes de l'argument d'entrée TABLE. L’UDTF reçoit un attribut d’entrée pour chaque nom de la liste dans l’ordre dans lequel ils sont répertoriés.

Cet analyze exemple retourne une colonne de sortie pour chaque mot dans l’argument de chaîne d’entrée.

from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult


@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result

MyUDTF(lit("hello world")).columns
['word_0', 'word_1']

Transférer l’état vers les appels futurs eval

La analyze méthode peut servir d’emplacement pratique pour effectuer l’initialisation, puis transférer les résultats aux appels de méthode futurs eval pour le même appel UDTF.

Pour ce faire, créez une sous-classe de AnalyzeResult et retournez une instance de la sous-classe à partir de la méthode analyze. Ensuite, ajoutez un argument supplémentaire à la __init__ méthode pour accepter cette instance.

Cet analyze exemple retourne un schéma de sortie constante, mais ajoute des informations personnalisées dans les métadonnées de résultat à consommer par les appels de méthode futurs __init__ :

from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Générer des lignes de sortie

La eval méthode s’exécute une fois pour chaque ligne de l’argument de table d’entrée (ou juste une fois si aucun argument de table n’est fourni), suivie d’un appel de la terminate méthode à la fin. Chaque méthode génère zéro ou plusieurs lignes conformes au schéma de résultat en produisant des tuples, des listes ou des objets pyspark.sql.Row.

Cet exemple retourne une ligne en fournissant un tuple de trois éléments :

def eval(self, x, y, z):
  yield (x, y, z)

Vous pouvez également omettre les parenthèses :

def eval(self, x, y, z):
  yield x, y, z

Ajoutez une virgule de fin pour retourner une ligne avec une seule colonne :

def eval(self, x, y, z):
  yield x,

Vous pouvez également générer un pyspark.sql.Row objet.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

Cet exemple génère des lignes de sortie à partir de la méthode terminate à l’aide d’une liste Python. Vous pouvez stocker l’état à l’intérieur de la classe à partir des étapes précédentes de l’évaluation UDTF à cet effet.

def terminate(self):
  yield [self.x, self.y, self.z]

Passer des arguments scalaires à un UDTF

Vous pouvez passer des arguments scalaires à un UDTF en tant qu’expressions constantes comprenant des valeurs littérales ou des fonctions en fonction de celles-ci. Par exemple:

SELECT * FROM get_sum_diff(1, y => 2)

Passer des arguments de table à un UDTF

Les UDTFs Python peuvent accepter une table d'entrée en tant qu'argument en plus des arguments d'entrée scalaires. Un seul UDTF peut également accepter un argument de table et plusieurs arguments scalaires.

Ensuite, toute requête SQL peut fournir une table d’entrée à l’aide du TABLE mot clé suivi de parenthèses entourant un identificateur de table approprié, comme TABLE(t). Vous pouvez également passer une sous-requête de table, comme TABLE(SELECT a, b, c FROM t) ou TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

L’argument de table d’entrée est ensuite représenté en tant qu’argument pyspark.sql.Row de la eval méthode, avec un appel à la eval méthode pour chaque ligne de la table d’entrée. Vous pouvez utiliser des annotations de champ de colonne PySpark standard pour interagir avec les colonnes de chaque ligne. L’exemple suivant illustre l’importation explicite du type PySpark Row , puis le filtrage de la table passée sur le id champ :

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Pour interroger la fonction, utilisez le TABLE mot clé SQL :

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Spécifier un partitionnement des lignes d'entrée en fonction des appels de fonction

Lors de l’appel d’un UDTF avec un argument de table, toute requête SQL peut partitionner la table d’entrée sur plusieurs appels UDTF en fonction des valeurs d’une ou plusieurs colonnes de table d’entrée.

Pour spécifier une partition, utilisez la PARTITION BY clause dans l’appel de fonction après l’argument TABLE . Cela garantit que toutes les lignes d’entrée avec chaque combinaison unique de valeurs des colonnes de partitionnement seront consommées par une seule instance de la classe UDTF.

Notez qu’en plus des références de colonne simples, la PARTITION BY clause accepte également des expressions arbitraires basées sur des colonnes de table d’entrée. Par exemple, vous pouvez spécifier la LENGTH chaîne, extraire un mois d’une date ou concaténer deux valeurs.

Il est également possible de spécifier WITH SINGLE PARTITION au lieu de PARTITION BY demander une seule partition où toutes les lignes d’entrée doivent être consommées par une seule instance de la classe UDTF.

Dans chaque partition, vous pouvez éventuellement spécifier un classement requis des lignes d’entrée eval , car la méthode UDTF les consomme. Pour ce faire, fournissez une ORDER BY clause après la PARTITION BY ou la WITH SINGLE PARTITION clause décrite ci-dessus.

Par exemple, considérez l’UDTF suivante :

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Vous pouvez spécifier des options de partitionnement lors de l’appel de l’UDTF sur la table d’entrée de manière muliple :

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Spécifier un partitionnement des lignes d’entrée à partir de la analyze méthode

Notez que pour chacune des méthodes ci-dessus de partitionner la table d’entrée lors de l’appel de fonctions définies par l’utilisateur dans des requêtes SQL, il existe un moyen correspondant pour la méthode UDTF de spécifier automatiquement la même méthode de analyze partitionnement.

  • Au lieu d'appeler un UDTF en tant que SELECT * FROM udtf(TABLE(t) PARTITION BY a), vous pouvez mettre à jour la méthode analyze pour définir le champ partitionBy=[PartitioningColumn("a")] et simplement appeler la fonction à l'aide de SELECT * FROM udtf(TABLE(t)).
  • De la même manière, au lieu de spécifier TABLE(t) WITH SINGLE PARTITION ORDER BY b dans la requête SQL, vous pouvez faire en sorte que analyze définisse les champs withSinglePartition=true et orderBy=[OrderingColumn("b")] puis simplement passer TABLE(t).
  • Au lieu de passer TABLE(SELECT a FROM t) dans la requête SQL, vous pouvez faire en sorte que analyze définisse select=[SelectedColumn("a")] et ensuite simplement passer TABLE(t).

Dans l’exemple suivant, analyze retourne un schéma de sortie constant, sélectionne un sous-ensemble de colonnes dans la table d’entrée et spécifie que la table d’entrée est partitionnée sur plusieurs appels UDTF en fonction des valeurs de la date colonne :

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])