Note
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de changer d’annuaire.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de changer d’annuaire.
Le module pyspark.pipelines (ici sous l'alias de dp) implémente une grande partie de ses fonctionnalités principales à l’aide de décorateurs. Ces décorateurs acceptent une fonction qui définit une requête de streaming ou de traitement par lots et qui retourne un DataFrame Apache Spark. La syntaxe suivante montre un exemple simple de définition d’un jeu de données de pipeline :
from pyspark import pipelines as dp
@dp.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
Cette page fournit une vue d’ensemble des fonctions et requêtes qui définissent des jeux de données dans des pipelines. Pour obtenir la liste complète des décorateurs disponibles, consultez Référence du développeur Pipeline.
Les fonctions que vous utilisez pour définir des jeux de données ne doivent pas inclure une logique Python arbitraire non liée au jeu de données, y compris les appels aux API tierces. Les pipelines exécutent ces fonctions plusieurs fois pendant la planification, la validation et les mises à jour. L’inclusion d’une logique arbitraire peut entraîner des résultats inattendus.
Lire des données pour commencer une définition de jeu de données
Les fonctions utilisées pour définir des jeux de données de pipeline commencent généralement par une spark.read ou spark.readStream opération. Ces opérations de lecture retournent un objet DataFrame statique ou en streaming que vous utilisez pour définir des transformations supplémentaires avant de renvoyer le DataFrame. D’autres exemples d’opérations spark qui retournent un DataFrame incluent spark.table, ou spark.range.
Les fonctions ne doivent jamais référencer les DataFrames définis en dehors de la fonction. Tenter de se référer aux DataFrames définis dans une autre portée peut entraîner un comportement inattendu. Pour obtenir un exemple de modèle de métaprogrammation pour la création de plusieurs tables, consultez Créer des tables dans une for boucle.
Les exemples suivants montrent la syntaxe de base pour lire des données à l’aide d’une logique de traitement par lots ou de diffusion en continu :
from pyspark import pipelines as dp
# Batch read on a table
@dp.materialized_view()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dp.materialized_view()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dp.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dp.table()
def function_name():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
Si vous devez lire des données à partir d’une API REST externe, implémentez cette connexion à l’aide d’une source de données personnalisée Python. Consultez les sources de données personnalisées PySpark.
Note
Il est possible de créer des DataFrames Apache Spark arbitraires à partir de collections de données Python, notamment pandas DataFrames, dicts et listes. Ces modèles peuvent être utiles pendant le développement et le test, mais la plupart des définitions de jeu de données de pipeline de production doivent commencer par charger des données à partir de fichiers, d’un système externe ou d’une table ou d’une vue existante.
"Chaînage des transformations"
Les pipelines prennent en charge presque toutes les transformations de DataFrame Apache Spark. Vous pouvez inclure n’importe quel nombre de transformations dans votre fonction de définition de jeu de données, mais vous devez vous assurer que les méthodes que vous utilisez retournent toujours un objet DataFrame.
Si vous avez une transformation intermédiaire qui pilote plusieurs charges de travail en aval, mais que vous n’avez pas besoin de la matérialiser en tant que table, utilisez @dp.temporary_view() pour ajouter une vue temporaire à votre pipeline. Vous pouvez ensuite référencer cette vue à l'aide de spark.read.table("temp_view_name") dans plusieurs définitions de jeux de données en aval. La syntaxe suivante illustre ce modèle :
from pyspark import pipelines as dp
@dp.temporary_view()
def a():
return spark.read.table("source").filter(...)
@dp.materialized_view()
def b():
return spark.read.table("a").groupBy(...)
@dp.materialized_view()
def c():
return spark.read.table("a").groupBy(...)
Cela garantit que le pipeline dispose d'une vue d'ensemble complète des transformations dans votre vue lors de la planification du pipeline et empêche les problèmes potentiels liés au code arbitraire Python s'exécutant en dehors des définitions de jeu de données.
Dans votre fonction, vous pouvez chaîner des DataFrames ensemble pour créer de nouveaux DataFrames sans écrire de résultats incrémentiels en tant que vues, vues matérialisées ou tables de streaming, comme dans l’exemple suivant :
from pyspark import pipelines as dp
@dp.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
Si tous vos DataFrames effectuent leurs lectures initiales à l’aide d’une logique de traitement par lots, votre résultat de retour est un DataFrame statique. Si vous avez des requêtes en streaming, votre résultat de retour est un DataFrame en streaming.
Retourner un DataFrame
Utilisez @dp.table pour créer une table de streaming à partir des résultats d'une lecture en continu. Utilisez @dp.materialized_view pour créer une vue matérialisée à partir des résultats d'une lecture de lots. La plupart des autres décorateurs fonctionnent à la fois avec des DataFrames en flux et statiques, tandis que certains nécessitent un DataFrame en flux.
La fonction utilisée pour définir un jeu de données doit retourner un DataFrame Spark. N’utilisez jamais de méthodes qui enregistrent ou écrivent dans des fichiers ou des tables dans le cadre de votre code de jeu de données de pipeline.
Exemples d’opérations Apache Spark qui ne doivent jamais être utilisées dans le code de pipeline :
collect()count()toPandas()save()saveAsTable()start()toTable()
Note
Les pipelines prennent également en charge l’utilisation de Pandas sur Spark pour les fonctions de définition de jeu de données. Consultez l’API Pandas sur Spark.
Utiliser SQL dans un pipeline Python
PySpark prend en charge l’opérateur spark.sql pour écrire du code DataFrame en utilisant SQL. Lorsque vous utilisez ce modèle dans le code source du pipeline, il se compile en vues matérialisées ou tables de flux.
L’exemple de code suivant équivaut à utiliser spark.read.table("catalog_name.schema_name.table_name") pour la logique de requête du jeu de données :
@dp.materialized_view
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read et dlt.read_stream (hérité)
L’ancien dlt module inclut les fonctions dlt.read() et dlt.read_stream() introduites pour prendre en charge la fonctionnalité en mode de publication de pipeline hérité. Ces méthodes sont prises en charge, mais Databricks recommande de toujours utiliser les fonctions spark.read.table() et spark.readStream.table() en raison des éléments suivants :
- Les
dltfonctions ont une prise en charge limitée de la lecture des jeux de données définis en dehors du pipeline actuel. - Les
sparkfonctions prennent en charge la spécification d’options, telles queskipChangeCommits, pour lire les opérations. La spécification des options n’est pas prise en charge par lesdltfonctions. - Le
dltmodule lui-même a été remplacé par lepyspark.pipelinesmodule. Databricks recommande d’utiliserfrom pyspark import pipelines as dppour importerpyspark.pipelineslors de l’écriture de code de pipelines en Python.