チュートリアル: エンドツーエンドの Lakehouse 分析パイプラインを実行する

このチュートリアルでは、Azure Databricks レイクハウス用のエンド ツー エンドの分析パイプラインを設定する方法について説明します。

重要

このチュートリアルでは、Unity Catalog 対応クラスターに対し、対話型ノートブックを使用して Python から一般的な ETL タスクを実行します。 Unity Catalog を使用していない場合は、「Azure Databricks で最初の ETL ワークロードを実行する」を参照してください。

このチュートリアルのタスク

この記事を読み終えるころには、次のような作業が快適に感じることでしょう。

  1. Unity Catalog 対応のコンピューティング クラスターを起動する
  2. Databricks ノートブックを作成する
  3. Unity Catalog の外部の場所にデータを書き込んだりそこからデータを読み取ったりする
  4. 自動ローダーを使用して Unity Catalog テーブルへの増分データ インジェストを構成する
  5. ノートブック セルを実行してデータを処理、照会、プレビューする
  6. ノートブックを Databricks ジョブとしてスケジュールする
  7. Unity Catalog のテーブルに対して Databricks SQL からクエリを実行する

Azure Databricks には、抽出、変換、読み込み (ETL) パイプラインをデータのプロフェッショナルがすばやく開発、デプロイできる運用環境向けのツール スイートが用意されています。 データ スチュワードは Unity Catalog を使用して、組織全体のユーザーを対象に、ストレージの資格情報、外部の場所、データベース オブジェクトを構成してセキュリティで保護できます。 Databricks SQL では、アナリストが運用環境の ETL ワークロードで使用されるのと同じテーブルに対して SQL クエリを実行できるため、リアルタイムのビジネスインテリジェンスを大規模に実現できます。

Delta Live Tables を使用して ETL パイプラインを構築することもできます。 Databricks は、運用 ETL パイプラインの構築、デプロイ、保守の複雑さを軽減するために Delta Live Tables を作成しました。 「チュートリアル: 最初の Delta Live Tables パイプラインを実行する」を参照してください。

必要条件

Note

以降の手順の大半は、クラスターの制御権限がなくても、クラスターへのアクセス権があれば実行できます。

手順 1: クラスターを作成する

探索的データ分析やデータ エンジニアリングを行うには、コマンドの実行に必要なコンピューティング リソースを提供するクラスターを作成します。

  1. サイドバーで、compute icon[コンピューティング]をクリックします。
  2. サイドバーで、新規アイコン[新規] をクリックし、[クラスター] を選びます。 [新しいクラスター/コンピューティング] ページが表示されます。
  3. クラスターの一意の名前を指定します。
  4. [Single node] (単一ノード) オプション ボタンを選択します。
  5. [アクセス モード] ボックスの一覧から [単一ユーザー] を選択します。
  6. ご使用のメール アドレスが [単一ユーザー] フィールドに表示されていることを確認します。
  7. Unity Catalog を使用するために必要な Databricks Runtime のバージョン (11.1 以降) を選択します。
  8. [コンピューティングの作成] をクリックしてクラスターを作成します。

Databricks クラスターの詳細については、「コンピューティング」を参照してください。

手順 2: Databricks ノートブックを作成する

Azure Databricks で対話的にコードを記述して実行するには、ノートブックを作成します。

  1. サイドバーで、新規アイコン[新規] をクリックし、[Notebook] をクリックします。
  2. [ノートブックの作成] ページで、次の手順を実行します。
    • ノートブックの一意の名前を指定します。
    • 既定の言語が Python に設定されていることを確認します。
    • [接続] ドロップダウン メニューを使用して、[クラスター] ドロップダウンから手順 1 で作成したクラスターを選択します。

ノートブックが開き、空のセルが 1 つ表示されます。

ノートブックの作成と管理について詳しくは、「Notebooks を管理する」を参照してください。

手順 3: Unity Catalog によって管理される外部の場所にデータを書き込んだりそこからデータを読み込んだりする

Databricks では、自動ローダーを使用した増分データ インジェストが推奨されています。 自動ローダーは、クラウド オブジェクト ストレージに到着した新しいファイルを自動的に検出して処理します。

Unity Catalog を使用して、外部の場所への安全なアクセスを管理します。 外部の場所に対する READ FILES アクセス許可を持ったユーザーまたはサービス プリンシパルが自動ローダーを使用してデータを取り込むことができます。

通常、外部の場所に到着するデータは、他のシステムからの書き込みによるものです。 このデモでは、外部の場所に JSON ファイルを出力することでデータの到着をシミュレートできます。

以下のコードをノートブックのセルにコピーしてください。 catalog の文字列値は、CREATE CATALOGUSE CATALOG のアクセス許可があるカタログの名前に置き換えます。 external_location の文字列値は、READ FILESWRITE FILESCREATE EXTERNAL TABLE の各アクセス許可がある外部の場所のパスに置き換えます。

外部の場所は、ストレージ コンテナー全体として定義することもできますが、通常は、コンテナー内で入れ子になったディレクトリの場所を指定します。

外部の場所に使用する適切なパスの形式は "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location" となります。


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

このセルを実行すると、12 バイトという行と "Hello world!" という文字列が出力され、指定されたカタログに存在するすべてのデータベースが表示されます。 このセルを実行できない場合は、使用中のワークスペースで Unity Catalog が有効になっていることを確認し、このチュートリアルを進めるうえで必要なアクセス許可をワークスペース管理者に要求してください。

以下の Python コードは、メールアドレスを使用して、指定されたカタログに一意のデータベースを作成し、指定された外部の場所に一意の保存場所を作成します。 このセルを実行すると、このチュートリアルに関連するデータがすべて削除されるので、この例を何度実行しても同じ結果が得られます。 接続されたシステムから外部のソースに到着する大量のデータをシミュレートするためのクラスが定義され、インスタンス化されます。

このコードをノートブックの新しいセルにコピーし、それを実行して環境を構成してください。

注意

このコードで定義されている変数を使用すれば、既存のワークスペース資産や他のユーザーと競合することなく安全にコードを実行できます。 ネットワークまたはストレージのアクセス許可が制限されている場合は、このコードを実行するとエラーが発生します。ワークスペース管理者に連絡して、それらの制限のトラブルシューティングを行ってください。


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

後は以下のコードをセルにコピーして実行すれば、一括でデータを投入できます。 新しいデータの到着をトリガーするために、このセルは最大 60 回手動で実行することができます。

RawData.land_batch()

手順 4: Unity Catalog にデータを取り込むように自動ローダーを構成する

Databricks はデータの格納に Delta Lake を使用することを推奨しています。 Delta Lake はオープンソースのストレージ レイヤーです。ACID トランザクションを備え、データ レイクハウスを実現します。 Databricks に作成されるテーブルの既定の形式は Delta Lake です。

Unity Catalog テーブルにデータを取り込むように自動ローダーを構成するには、ノートブックの空のセルに以下のコードをコピーして貼り付けてください。

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

自動ローダーの詳細については、「自動ローダー」を参照してください。

Unity Catalog での構造化ストリーミングについて詳しくは、「構造化ストリーミングで Unity Catalog を使用する」を参照してください。

手順 5: データを処理して対話的に操作する

ノートブックでは、ロジックがセル単位で実行されます。 手順に従って、セル内のロジックを実行してください。

  1. 前の手順で完成したセルを実行するために、そのセルを選択し、Shift + Enter キーを押します。

  2. 作成されたテーブルに対してクエリを実行するために、以下のコードをコピーして空のセルに貼り付け、Shift + Enter キーを押してセルを実行します。

    df = spark.read.table(table_name)
    
  3. DataFrame 内のデータをプレビューするために、以下のコードをコピーして空のセルに貼り付け、Shift + Enter キーを押してセルを実行します。

    display(df)
    

データを対話的に視覚化する方法について詳しくは、「Databricks ノートブックでの視覚化」を参照してください。

手順 6: ジョブをスケジュールする

Databricks ノートブックを運用スクリプトとして実行するには、それらをタスクとして Databricks ジョブに追加します。 この手順では、手動でトリガーできる新しいジョブを作成します。

ノートブックをタスクとしてスケジュールするには:

  1. ヘッダー バーの右側にある [スケジュール] をクリックします。
  2. [ジョブ名] に一意の名前を入力します。
  3. [手動] をクリックします。
  4. [クラスター] ボックスの一覧から、手順 1. で作成したクラスターを選択します。
  5. [作成] をクリックします。
  6. 表示されたウィンドウで、[今すぐ実行] をクリックします。
  7. ジョブの実行結果を確認するには、前回実行タイムスタンプの横にある 外部リンク アイコンをクリックします。

ジョブの詳細については、Azure Databricks のジョブとはに関するページを参照してください。

手順 7: Databricks SQL からテーブルに対してクエリを実行する

現在のカタログとデータベースに対する USE CATALOG アクセス許可、現在のスキーマに対する USE SCHEMA アクセス許可、およびテーブルに対する SELECT アクセス許可があるユーザーはだれでも、任意の Databricks API からテーブルの内容を照会できます。

Databricks SQL でクエリを実行するには、実行中の SQL ウェアハウスへのアクセス権が必要です。

このチュートリアルで先ほど作成したテーブルには、target_table という名前が付いています。 それに対してクエリを実行するためには、最初のセルで指定したカタログと e2e_lakehouse_<your-username> という形式のデータベースを使用します。 作成されたデータ オブジェクトは、カタログ エクスプローラーを使用して確認できます。

その他の統合

Azure Databricks を使用したデータ エンジニアリングのための統合とツールについては、以下の記事を参照してください。