Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cet article explique comment déplacer des tables de streaming et des vues matérialisées d'un pipeline à l'autre. Après le déplacement, le pipeline vers lequel vous déplacez le flux met à jour la table, plutôt que le pipeline d’origine. Cela est utile dans de nombreux scénarios, notamment :
- Fractionnez un grand pipeline en plusieurs plus petits.
- Fusionnez plusieurs pipelines dans un seul pipeline plus grand.
- Modifiez la fréquence d’actualisation de certaines tables d’un pipeline.
- Déplacez les tables d’un pipeline qui utilise le mode de publication hérité vers le mode de publication par défaut. Pour plus d’informations sur le mode de publication ancien, consultez le mode de publication ancien pour les pipelines. Pour voir comment migrer le mode de publication d’un pipeline entier à la fois, consultez Activer le mode de publication par défaut dans un pipeline.
- Déplacez des tables à travers des pipelines dans différents espaces de travail.
Spécifications
Voici les conditions requises pour déplacer une table entre des pipelines.
Vous devez utiliser Databricks Runtime 16.3 ou version ultérieure lors de l’exécution de la
ALTER ...commande, et Databricks Runtime 17.2 pour le déplacement de table inter-espaces de travail.Les pipelines source et de destination doivent être les suivants :
- Propriété du compte d’utilisateur ou du principal de service Azure Databricks exécutant l’opération
- Dans les espaces de travail qui partagent un "metastore". Pour vérifier le metastore, consultez la
current_metastorefonction.
Le pipeline de destination doit utiliser le mode de publication par défaut. Cela vous permet de publier des tables sur plusieurs catalogues et schémas.
Sinon, les deux pipelines doivent utiliser le mode de publication hérité et avoir la même valeur catalogue et cible dans les paramètres. Pour plus d’informations sur le mode de publication hérité, consultez le schéma LIVE (hérité) .
Note
Cette fonctionnalité ne prend pas en charge le déplacement d’un pipeline à l’aide du mode de publication par défaut vers un pipeline à l’aide du mode de publication hérité.
Déplacer une table entre des pipelines
Les instructions suivantes décrivent comment déplacer une table de diffusion en continu ou une vue matérialisée d’un pipeline à un autre.
Si le pipeline source est en cours d'exécution, arrêtez-le. Attendez qu’elle s’arrête complètement.
Supprimez la définition de la table du code du pipeline source et stockez-la quelque part pour une référence ultérieure.
Incluez les requêtes ou le code nécessaires pour que le pipeline s’exécute correctement.
À partir d’un notebook ou d’un éditeur SQL, exécutez la commande SQL suivante pour réaffecter la table du pipeline source au pipeline de destination :
ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name> SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");Notez que la commande SQL doit être exécutée à partir de l’espace de travail du pipeline source.
La commande utilise respectivement
ALTER MATERIALIZED VIEWetALTER STREAMING TABLEpour les vues matérialisées et les tables de diffusion en continu gérées par Unity Catalog. Pour effectuer la même action sur une table de metastore Hive, utilisezALTER TABLE.Par exemple, si vous souhaitez déplacer une table de diffusion en continu nommée
salesvers un pipeline avec l’IDabcd1234-ef56-ab78-cd90-1234efab5678, exécutez la commande suivante :ALTER STREAMING TABLE sales SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");Note
Le
pipelineIddoit être un identificateur de pipeline valide. Lanullvaleur n’est pas autorisée.Ajoutez la définition de la table au code du pipeline de destination.
Note
Si le schéma catalogue ou cible diffère entre la source et la destination, la copie de la requête peut ne pas fonctionner exactement. Les tables partiellement qualifiées dans la définition peuvent être résolues différemment. Vous devrez peut-être mettre à jour la définition tout en passant à qualifier complètement les noms de table.
Note
Supprimez ou commentez tout flux ajouté une fois (en Python, les requêtes avec append_flow(once=True), dans SQL, les requêtes avec INSERT INTO ONCE) du code du pipeline de destination. Pour plus d’informations, consultez Limitations.
Le déplacement est terminé. Vous pouvez maintenant exécuter à la fois les pipelines source et destination. Le pipeline de destination met à jour la table.
Résolution des problèmes
Le tableau suivant décrit les erreurs qui peuvent se produire lors du déplacement d’une table entre des pipelines.
| Erreur | Descriptif |
|---|---|
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE |
Le pipeline source est en mode de publication par défaut, et la destination utilise le schéma LIVE (mode hérité). Cela n'est pas pris en charge. Si la source utilise le mode de publication par défaut, la destination doit également. |
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE |
Seul le déplacement de tables entre les pipelines est pris en charge. Le déplacement de tables de streaming et de vues matérialisées créées avec Databricks SQL n’est pas pris en charge. |
DESTINATION_PIPELINE_NOT_FOUND |
Le pipelines.pipelineId doit être un pipeline valide. La valeur pipelineId ne peut pas être null. |
| La table ne parvient pas à se mettre à jour à l'emplacement cible après le déplacement. | Pour atténuer rapidement le problème dans ce cas, déplacez la table vers le pipeline source en suivant les mêmes instructions. |
PIPELINE_PERMISSION_DENIED_NOT_OWNER |
Les pipelines source et de destination doivent être détenus par l’utilisateur effectuant l’opération de déplacement. |
TABLE_ALREADY_EXISTS |
Le tableau répertorié dans le message d’erreur existe déjà. Cela peut se produire si une table de stockage pour le pipeline existe déjà. Dans ce cas, utilisez DROP sur la table mentionnée dans l’erreur. |
Exemple avec plusieurs tables dans un pipeline
Les pipelines peuvent comporter plusieurs tables. Vous pouvez toujours déplacer une table à la fois entre les pipelines. Dans ce scénario, il existe trois tables (table_a, table_b, table_c) qui lisent les unes des autres de manière séquentielle dans le pipeline source. Nous voulons déplacer une table, table_b, vers un autre pipeline.
Code source initial du pipeline
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Table to be moved to new pipeline:
@dp.table
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
Nous allons passer table_b à un autre pipeline en copiant et en supprimant la définition de table de la source et en mettant à jour table_b's pipelineId.
Tout d’abord, suspendez toutes les planifications et attendez que les mises à jour se terminent sur les pipelines source et cible. Modifiez ensuite le pipeline source pour supprimer le code de la table déplacée. L’exemple de code de pipeline source mis à jour devient :
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Removed, to be in new pipeline:
# @dp.table
# def table_b():
# return (
# spark.read.table("table_a")
# .select(col("column1"), col("column2"))
# )
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
Accédez à l’éditeur SQL pour exécuter la ALTER pipelineId commande.
ALTER MATERIALIZED VIEW table_b
SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");
Ensuite, accédez au pipeline de destination et ajoutez la définition de table_b. Si le catalogue et le schéma par défaut sont identiques dans les paramètres du pipeline, aucune modification du code n’est requise.
Code du pipeline cible :
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="table_b")
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
Si le catalogue et le schéma par défaut diffèrent dans les paramètres du pipeline, vous devez ajouter le nom complet à l’aide du catalogue et du schéma du pipeline.
Par exemple, le code du pipeline cible peut être :
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
return (
spark.read.table("source_catalog.source_schema.table_a")
.select(col("column1"), col("column2"))
)
Exécutez (ou réactivez les planifications) pour les pipelines source et cible.
Les pipelines sont désormais disjoints. La requête pour table_c lit depuis table_b (qui est maintenant dans le pipeline cible) et table_b lit depuis table_a (dans le pipeline source). Lorsque vous effectuez une exécution déclenchée sur le pipeline source, table_b n’est pas mise à jour, car elle n’est plus gérée par ce pipeline. Le pipeline source traite table_b comme une table externe au pipeline. Cela est comparable à la définition d’une vue matérialisée lisant à partir d’une table Delta dans Unity Catalog non gérée par le pipeline.
Limites
Les limitations suivantes concernent le déplacement de tables entre des pipelines.
- Les vues matérialisées et les tables de streaming créées avec Databricks SQL ne sont pas prises en charge.
- Les flux d’ajout une fois - Python append_flow(once=True) et les flux SQL INSERT INTO ONCE - ne sont pas pris en charge. Leur statut d'exécution n'est pas conservé, ils peuvent être réexécutés dans le pipeline de destination. Supprimez ou commentez les flux ajoutés une fois dans le pipeline de destination pour éviter leur exécution répétée.
- Les tables ou vues privées ne sont pas prises en charge.
- Les pipelines source et de destination doivent impérativement être des pipelines. Les pipelines nuls ne sont pas pris en charge.
- Les pipelines source et de destination doivent se trouver dans le même espace de travail ou dans différents espaces de travail qui partagent le même metastore.
- Les pipelines source et de destination doivent être détenus par l’utilisateur qui exécute l’opération de déplacement.
- Si le pipeline source utilise le mode de publication par défaut, le pipeline de destination doit également utiliser le mode de publication par défaut. Vous ne pouvez pas déplacer une table d’un pipeline à l’aide du mode de publication par défaut vers un pipeline qui utilise le schéma LIVE (hérité). Consultez le schéma en direct (hérité).
- Si les pipelines source et de destination utilisent tous les deux le schéma LIVE (hérité), ils doivent avoir les mêmes valeurs dans les paramètres
catalogettarget.