Partager via


Importer des modules Python à partir de dossiers Git ou de fichiers d’espace de travail

Vous pouvez stocker du code Python dans des dossiers Git Databricks ou dans des fichiers d’espace de travail, puis importer ce code Python dans vos pipelines Delta Live Tables. Pour plus d’informations sur l’utilisation des modules dans des dossiers Git Databricks ou des fichiers d’espace de travail, consultez Utiliser des modules Python et R.

Remarque

Vous ne pouvez pas importer de code source à partir d’un notebook stocké dans un dossier Git Databricks ou un fichier d’espace de travail. Au lieu de cela, ajoutez le notebook directement lorsque vous créez ou modifiez un pipeline. Voir Créer un pipeline.

Importez un module Python dans un pipeline Delta Live Tables

L'exemple suivant montre comment importer des requêtes de jeu de données en tant que modules Python à partir de fichiers d'espace de travail. Bien que cet exemple décrive l'utilisation de fichiers d'espace de travail pour stocker le code source du pipeline, vous pouvez l'utiliser avec du code source stocké dans un dossier Git.

Pour exécuter cet exemple, procédez comme suit :

  1. Cliquez sur Icône Espaces de travailEspace de travail dans la barre latérale de votre espace de travail Azure Databricks pour ouvrir le navigateur de l’espace de travail.

  2. Utilisez le navigateur de l’espace de travail pour sélectionner un répertoire pour les modules Python.

  3. Cliquez sur menu Kebab dans la colonne la plus à droite du répertoire sélectionné, puis cliquez sur Créer > Fichier.

  4. Entrez un nom pour le fichier, par exemple clickstream_raw_module.py. L’éditeur de fichiers s’ouvre. Pour créer un module afin de lire les données sources dans une table, entrez ce qui suit dans la fenêtre de l’éditeur :

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. Pour créer un module qui crée une table contenant des données préparées, créez un fichier dans le même répertoire, entrez un nom pour le fichier, par exemple, clickstream_prepared_module.py, puis entrez ce qui suit dans la nouvelle fenêtre de l’éditeur :

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. Ensuite, créez un notebook de pipeline. Accédez à la page d’accueil Azure Databricks, sélectionnez Créer un notebook ou cliquez sur Nouvelle icôneNouvelle dans la barre latérale, puis sélectionnez Notebook. Vous pouvez également créer le notebook dans le navigateur de l’espace de travail en cliquant sur menu Kebab , puis sur Créer > Notebook.

  7. Nommez votre notebook et vérifiez que Python est le langage par défaut.

  8. Cliquez sur Créer.

  9. Entrez l’exemple de code dans le notebook.

    Remarque

    Si votre notebook importe des modules ou des packages à partir d’un chemin d’accès aux fichiers de l’espace de travail ou d’un chemin d’accès aux dossiers Git Databricks différent du répertoire du notebook, vous devez ajouter manuellement le chemin d’accès aux fichiers à l’aide de sys.path.append().

    Si vous importez un fichier à partir d’un dossier Git Databricks, vous devez ajouter /Workspace/ au début du chemin d’accès. Par exemple : sys.path.append('/Workspace/...'). Si /Workspace/ est omis du chemin d’accès, vous obtenez une erreur.

    Si les modules ou les packages sont stockés dans le même répertoire que le notebook, il n’est pas nécessaire d’ajouter le chemin d’accès manuellement. Vous n’avez pas non plus besoin d’ajouter manuellement le chemin d’accès lors de l’importation à partir du répertoire racine d’un dossiers Git, car le répertoire racine est automatiquement ajouté au chemin d’accès.

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        dlt.read("clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    Remplacez<module-path> par le chemin d’accès au répertoire contenant les modules Python à importer.

  10. Créer un pipeline à l’aide du nouveau notebook.

  11. Pour exécuter le pipeline, dans la page Détails du pipeline , cliquez sur Démarrer.

Vous pouvez également importer du code Python en tant que package. L’extrait de code suivant d’un notebook Delta Live Tables importe le package test_utils à partir du répertoire dlt_packages à l’intérieur du même répertoire que le notebook. Le répertoire dlt_packages contient les fichiers test_utils.py et __init__.py, et test_utils.py définit la fonction create_test_table() :

import dlt

@dlt.table
def my_table():
  return dlt.read(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)