次の方法で共有


Git フォルダーまたはワークスペース ファイルから Python モジュールをインポートする

Python コードを Databricks Git フォルダーまたはワークスペース ファイルに格納し、その Python コードを Delta Live Tables パイプラインにインポートできます。 Git フォルダーまたはワークスペース ファイルでモジュールを使用する方法の詳細については、「Python と R のモジュールを使用する」を参照してください。

Note

Databricks Git フォルダーまたはワークスペース ファイルに保存されているノートブックからソース コードをインポートすることはできません。 代わりに、パイプラインを作成または編集するときにノートブックを直接追加します。 「パイプラインを作成する」を参照してください。

Python モジュールを Delta Live Tables パイプラインにインポートする

次の例では、ワークスペース ファイルから Python モジュールとしてデータ セット クエリをインポートする方法を示します。 この例では、ワークスペース ファイルを使用してパイプラインのソース コードを格納する方法について説明しますが、これを Git フォルダーに格納するソース コードで使用できます。

このサンプルを実行するには、次の手順を使用します。

  1. Azure Databricks ワークスペースのサイドバーで ワークスペース アイコン[ワークスペース] をクリックして、ワークスペース ブラウザーを開きます。

  2. ワークスペース ブラウザーを使用して、Python モジュールのディレクトリを選択します。

  3. 選択したディレクトリの右端の列で Kebab メニュー をクリックし、[ファイル]>[作成] をクリックします。

  4. ファイルの名前を入力します (例: clickstream_raw_module.py)。 ファイル エディターが開きます。 ソース データをテーブルに読み込むモジュールを作成するには、エディター ウィンドウで次のように入力します。

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. 準備されたデータを含む新しいテーブルを作成するモジュールを作成するには、同じディレクトリに新しいファイルを作成し、ファイルの名前 (例: clickstream_prepared_module.py) を入力します。新しいエディター ウィンドウに次のように入力します。

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. 次に、パイプライン ノートブックを作成します。 Azure Databricks のランディング ページにアクセスし、[ノートブックの作成] を選択するか、サイドバーで新規アイコン[新規] をクリックして、メニューから [ノートブック] を選択します。 Kebab メニュー をクリックし、[作成]>[ノートブック] をクリックして、ワークスペース ブラウザーでノートブックを作成することもできます。

  7. ノートブックに名前を付け、Python が既定の言語であることを確認します。

  8. Create をクリックしてください。

  9. ノートブックにコード例を入力します。

    Note

    ノートブック ディレクトリとは異なるワークスペース ファイル パスまたは Git フォルダー パスからノートブックでモジュールやパッケージをインポートする場合は、sys.path.append() を使用してファイルへのパスを手動で追加する必要があります。

    Git フォルダーからファイルをインポートする場合は、パスの前に /Workspace/ を追加する必要があります。 たとえば、sys.path.append('/Workspace/...') のようにします。 パスから /Workspace/ を省略すると、エラーが発生します。

    モジュールまたはパッケージがノートブックと同じディレクトリに格納されている場合は、パスを手動で追加する必要はありません。 また、ルート ディレクトリはパスに自動的に追加されるため、Git フォルダーのルート ディレクトリからインポートするときに、パスを手動で追加する必要はありません。

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        dlt.read("clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    <module-path> を、インポートする Python モジュールを含むディレクトリへのパスに置き換えます。

  10. 新しいノートブックを使用してパイプラインを作成します。

  11. パイプラインを実行するには、[パイプラインの詳細] ページで [開始] をクリックします。

Python コードをパッケージとしてインポートすることもできます。 Delta Live Tables ノートブックの次のコード スニペットでは、ノートブックと同じディレクトリ内の test_utils ディレクトリから dlt_packages パッケージがインポートされます。 dlt_packages ディレクトリには test_utils.py ファイルと __init__.py ファイルが含まれ、test_utils.py では create_test_table() 関数が定義されます。

import dlt

@dlt.table
def my_table():
  return dlt.read(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)