Notes
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Important
Cette fonctionnalité est disponible en préversion publique dans Databricks Runtime 14.3 LTS et versions ultérieures.
Une fonction de table définie par l’utilisateur (UDTF) vous permet d’inscrire des fonctions qui retournent des tables au lieu de valeurs scalaires. Contrairement aux fonctions scalaires qui retournent une valeur de résultat unique à partir de chaque appel, chaque UDTF est appelé dans la clause d’une FROM
instruction SQL et retourne une table entière en tant que sortie.
Chaque appel UDTF peut accepter zéro ou plusieurs arguments. Ces arguments peuvent être des expressions scalaires ou des arguments de table représentant des tables d’entrée entières.
Syntaxe UDTF de base
Apache Spark implémente des UDTF en Python en tant que classes Python avec une méthode eval
obligatoire qui utilise yield
pour émettre des lignes de sortie.
Pour utiliser votre classe en tant qu’UDTF, vous devez importer la fonction PySpark udtf
. Databricks recommande d’utiliser cette fonction comme décorateur et de spécifier explicitement les noms et les types de champs à l’aide de l’option returnType
(sauf si la classe définit une analyze
méthode comme décrit dans une section ultérieure).
L’UDTF suivante crée une table à l’aide d’une liste fixe de deux arguments entiers :
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Inscrire un UDTF
Les UDTF sont enregistrées pour la version locale SparkSession
et sont isolées au niveau du notebook ou de la tâche.
Vous ne pouvez pas inscrire des UDTFs en tant qu’objets dans le catalogue Unity, et les UDTFs ne peuvent pas être utilisées avec des entrepôts SQL.
Vous pouvez inscrire un UDTF à l’état actuel SparkSession
pour une utilisation dans les requêtes SQL avec la fonction spark.udtf.register()
. Fournissez un nom pour la fonction SQL et la classe UDTF Python.
spark.udtf.register("get_sum_diff", GetSumDiff)
Appeler un UDTF enregistré
Une fois enregistré, vous pouvez utiliser l’UDTF dans SQL en utilisant soit la commande magique %sql
soit la fonction spark.sql()
.
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);").show()
%sql
SELECT * FROM get_sum_diff(1,2);
Utiliser Apache Arrow
Si votre UDTF reçoit une petite quantité de données en entrée, mais génère une table volumineuse, Databricks recommande d’utiliser Apache Arrow. Vous pouvez l’activer en spécifiant le paramètre useArrow
lors de la déclaration de l’UDTF :
@udtf(returnType="c1: int, c2: int", useArrow=True)
Listes d’arguments variables - *args et **kwargs
Vous pouvez utiliser Python *args
ou **kwargs
la syntaxe et implémenter la logique pour gérer un nombre non spécifié de valeurs d’entrée.
L’exemple suivant retourne le même résultat tout en vérifiant explicitement la longueur et les types d’entrée pour les arguments :
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Voici le même exemple, mais en utilisant des arguments de mot clé :
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Définir un schéma statique au moment de l’inscription
L’UDTF retourne des lignes avec un schéma de sortie comprenant une séquence ordonnée de noms et de types de colonnes. Si le schéma UDTF doit toujours rester le même pour toutes les requêtes, vous pouvez spécifier un schéma statique et fixe après le @udtf
décorateur. Il doit s'agir soit d'un StructType
:
StructType().add("c1", StringType())
Ou une chaîne DDL représentant un type de struct :
c1: string
Calcul d’un schéma dynamique au moment de l’appel de fonction
Les FDTDs peuvent également déterminer le schéma de sortie de manière programmatique pour chaque appel, selon les valeurs des arguments d’entrée. Pour ce faire, définissez une méthode statique appelée analyze
qui accepte zéro ou plusieurs paramètres correspondant aux arguments fournis à l’appel UDTF spécifique.
Chaque argument de la analyze
méthode est une instance de la AnalyzeArgument
classe qui contient les champs suivants :
AnalyzeArgument champ de classe |
Descriptif |
---|---|
dataType |
Type de l’argument d’entrée en tant que DataType . Pour les arguments de table d’entrée, il s’agit d’un StructType représentant les colonnes de la table. |
value |
Valeur de l’argument d’entrée en tant que Optional[Any] . Il s’agit None des arguments de table ou des arguments scalaires littéraux qui ne sont pas constants. |
isTable |
Indique si l’argument d’entrée est une table en tant que BooleanType . |
isConstantExpression |
Indique si l’argument d’entrée est une expression pliable constante en tant que BooleanType . |
La méthode analyze
retourne une instance de la classe AnalyzeResult
, qui inclut le schéma de la table de résultats en tant que StructType
ainsi que quelques champs facultatifs. Si l’UDTF accepte un argument de table d’entrée, il AnalyzeResult
peut également inclure une méthode demandée pour partitionner et classer les lignes de la table d’entrée sur plusieurs appels UDTF, comme décrit plus loin.
AnalyzeResult champ de classe |
Descriptif |
---|---|
schema |
Le schéma de la table de résultats comme un StructType . |
withSinglePartition |
Indique s’il faut envoyer toutes les lignes d’entrée à la même instance de classe UDTF qu’un BooleanType . |
partitionBy |
Si elle est définie comme non vide ou nul, toutes les lignes avec chaque combinaison unique de valeurs des expressions de partitionnement sont traitées par une instance distincte de la classe UDTF. |
orderBy |
S’il est défini sur non vide, cela spécifie un classement des lignes dans chaque partition. |
select |
S'il est défini sur non vide, il s'agit d'une série d'expressions que l'UDTF spécifie pour que Catalyst évalue par rapport aux colonnes de l'argument d'entrée TABLE. L’UDTF reçoit un attribut d’entrée pour chaque nom de la liste dans l’ordre dans lequel ils sont répertoriés. |
Cet analyze
exemple retourne une colonne de sortie pour chaque mot dans l’argument de chaîne d’entrée.
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
MyUDTF(lit("hello world")).columns
['word_0', 'word_1']
Transférer l’état vers les appels futurs eval
La analyze
méthode peut servir d’emplacement pratique pour effectuer l’initialisation, puis transférer les résultats aux appels de méthode futurs eval
pour le même appel UDTF.
Pour ce faire, créez une sous-classe de AnalyzeResult
et retournez une instance de la sous-classe à partir de la méthode analyze
.
Ensuite, ajoutez un argument supplémentaire à la __init__
méthode pour accepter cette instance.
Cet analyze
exemple retourne un schéma de sortie constante, mais ajoute des informations personnalisées dans les métadonnées de résultat à consommer par les appels de méthode futurs __init__
:
from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Générer des lignes de sortie
La eval
méthode s’exécute une fois pour chaque ligne de l’argument de table d’entrée (ou juste une fois si aucun argument de table n’est fourni), suivie d’un appel de la terminate
méthode à la fin. Chaque méthode génère zéro ou plusieurs lignes conformes au schéma de résultat en produisant des tuples, des listes ou des objets pyspark.sql.Row
.
Cet exemple retourne une ligne en fournissant un tuple de trois éléments :
def eval(self, x, y, z):
yield (x, y, z)
Vous pouvez également omettre les parenthèses :
def eval(self, x, y, z):
yield x, y, z
Ajoutez une virgule de fin pour retourner une ligne avec une seule colonne :
def eval(self, x, y, z):
yield x,
Vous pouvez également générer un pyspark.sql.Row
objet.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
Cet exemple génère des lignes de sortie à partir de la méthode terminate
à l’aide d’une liste Python. Vous pouvez stocker l’état à l’intérieur de la classe à partir des étapes précédentes de l’évaluation UDTF à cet effet.
def terminate(self):
yield [self.x, self.y, self.z]
Passer des arguments scalaires à un UDTF
Vous pouvez passer des arguments scalaires à un UDTF en tant qu’expressions constantes comprenant des valeurs littérales ou des fonctions en fonction de celles-ci. Par exemple:
SELECT * FROM get_sum_diff(1, y => 2)
Passer des arguments de table à un UDTF
Les UDTFs Python peuvent accepter une table d'entrée en tant qu'argument en plus des arguments d'entrée scalaires. Un seul UDTF peut également accepter un argument de table et plusieurs arguments scalaires.
Ensuite, toute requête SQL peut fournir une table d’entrée à l’aide du TABLE
mot clé suivi de parenthèses entourant un identificateur de table approprié, comme TABLE(t)
. Vous pouvez également passer une sous-requête de table, comme TABLE(SELECT a, b, c FROM t)
ou TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
L’argument de table d’entrée est ensuite représenté en tant qu’argument pyspark.sql.Row
de la eval
méthode, avec un appel à la eval
méthode pour chaque ligne de la table d’entrée. Vous pouvez utiliser des annotations de champ de colonne PySpark standard pour interagir avec les colonnes de chaque ligne. L’exemple suivant illustre l’importation explicite du type PySpark Row
, puis le filtrage de la table passée sur le id
champ :
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Pour interroger la fonction, utilisez le TABLE
mot clé SQL :
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Spécifier un partitionnement des lignes d'entrée en fonction des appels de fonction
Lors de l’appel d’un UDTF avec un argument de table, toute requête SQL peut partitionner la table d’entrée sur plusieurs appels UDTF en fonction des valeurs d’une ou plusieurs colonnes de table d’entrée.
Pour spécifier une partition, utilisez la PARTITION BY
clause dans l’appel de fonction après l’argument TABLE
.
Cela garantit que toutes les lignes d’entrée avec chaque combinaison unique de valeurs des colonnes de partitionnement seront consommées par une seule instance de la classe UDTF.
Notez qu’en plus des références de colonne simples, la PARTITION BY
clause accepte également des expressions arbitraires basées sur des colonnes de table d’entrée. Par exemple, vous pouvez spécifier la LENGTH
chaîne, extraire un mois d’une date ou concaténer deux valeurs.
Il est également possible de spécifier WITH SINGLE PARTITION
au lieu de PARTITION BY
demander une seule partition où toutes les lignes d’entrée doivent être consommées par une seule instance de la classe UDTF.
Dans chaque partition, vous pouvez éventuellement spécifier un classement requis des lignes d’entrée eval
, car la méthode UDTF les consomme. Pour ce faire, fournissez une ORDER BY
clause après la PARTITION BY
ou la WITH SINGLE PARTITION
clause décrite ci-dessus.
Par exemple, considérez l’UDTF suivante :
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
Vous pouvez spécifier des options de partitionnement lors de l’appel de l’UDTF sur la table d’entrée de manière muliple :
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Spécifier un partitionnement des lignes d’entrée à partir de la analyze
méthode
Notez que pour chacune des méthodes ci-dessus de partitionner la table d’entrée lors de l’appel de fonctions définies par l’utilisateur dans des requêtes SQL, il existe un moyen correspondant pour la méthode UDTF de spécifier automatiquement la même méthode de analyze
partitionnement.
- Au lieu d'appeler un UDTF en tant que
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
, vous pouvez mettre à jour la méthodeanalyze
pour définir le champpartitionBy=[PartitioningColumn("a")]
et simplement appeler la fonction à l'aide deSELECT * FROM udtf(TABLE(t))
. - De la même manière, au lieu de spécifier
TABLE(t) WITH SINGLE PARTITION ORDER BY b
dans la requête SQL, vous pouvez faire en sorte queanalyze
définisse les champswithSinglePartition=true
etorderBy=[OrderingColumn("b")]
puis simplement passerTABLE(t)
. - Au lieu de passer
TABLE(SELECT a FROM t)
dans la requête SQL, vous pouvez faire en sorte queanalyze
définisseselect=[SelectedColumn("a")]
et ensuite simplement passerTABLE(t)
.
Dans l’exemple suivant, analyze
retourne un schéma de sortie constant, sélectionne un sous-ensemble de colonnes dans la table d’entrée et spécifie que la table d’entrée est partitionnée sur plusieurs appels UDTF en fonction des valeurs de la date
colonne :
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])