Delta Live Tables を使用してデータを読み込む

Delta Live Tables を使用して、Azure Databricks 上の Apache Spark でサポートされている任意のデータ ソースからデータを読み込むことができます。 Delta Live Tables のデータセット (テーブルとビュー) は、Spark DataFrame (ストリーミング DataFrames や Pandas for Spark DataFrames など) を返す任意のクエリに対して定義できます。 データ インジェスト タスクには、Databricks は、ほとんどのユース ケースでストリーミング テーブルの使用を推奨しています。 ストリーミング テーブルは、自動ローダーを使用するクラウド オブジェクト ストレージから、または Kafka などのメッセージ バスからデータを取り込むのに適しています。 次の例は、いくつかの一般的なパターンを示しています。

重要

すべてのデータ ソースで SQL がサポートされているわけではありません。 Delta Live Tables パイプラインで SQL ノートブックと Python ノートブックを混在させ、インジェスト以外のすべての操作に SQL を使用できます。

既定では Delta Live Tables でパッケージ化されていないライブラリの使用の詳細については、「Delta Live Tables パイプラインの Python 依存関係を管理する」を参照してください。

クラウド オブジェクト ストレージからファイルを読み込む

Databricks では、クラウド オブジェクト ストレージからのほとんどのデータ インジェスト タスクに対して、Delta Live Tables で自動ローダーを使用することをお勧めします。 自動ローダーと差分ライブ テーブルは、増加し続けるデータをクラウド ストレージに到着すると、増分的かつべき等的に読み込まれるように設計されています。 次の例では、自動ローダーを使用して CSV と JSON ファイルからデータセットを作成します。

Note

Unity Catalog が有効になったパイプラインで自動ローダーを使用してファイルを読み込むには、外部の場所を使用する必要があります。 Delta Live Tables での Unity Catalog の使用の詳細については、「Delta Live Tables パイプラインでの Unity Catalog の使用」を参照してください。

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

自動ローダーとは」および「自動ローダー SQL 構文」を参照してください。

警告

ファイル通知で自動ローダーを使用し、パイプラインまたはストリーミング テーブルの完全な更新を実行する場合は、リソースを手動でクリーンアップする必要があります。 ノートブックで CloudFilesResourceManager を使用してクリーンアップを実行できます。

メッセージ バスからデータを読み込む

Delta Live Tables パイプラインを構成して、ストリーミング テーブルを使用してメッセージ バスからデータを取り込むことができます。 Databricks では、ストリーミング テーブルと継続的な実行と拡張された自動スケーリングを組み合わせて、メッセージ バスからの低待機時間読み込みに最も効率的なインジェストを提供することをお勧めします。 「拡張自動スケーリングを使用して、Delta Live Tables パイプラインのクラスター使用率を最適化する」を参照してください。

たとえば、次のコードは、Kafka からデータを取り込むためのストリーミング テーブルを構成します:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

次の例のように、純粋 SQL でダウンストリーム操作を記述して、このデータに対してストリーミング変換を実行できます:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Event Hubs の使用例については、「Delta Live Tables データ ソースとして Azure Event Hubs を使用する」を参照してください。

ストリーミング データ ソースを構成する」を参照してください。

外部システムからデータを読み込む

Delta Live Tables では、Azure Databricks でサポートされている任意のデータ ソースからのデータの読み込みがサポートされています。 「データ ソースに接続する」を参照してください。 サポートされているデータ ソースに対して Lakehouse フェデレーションを使用して外部データを読み込むこともできます。 Lakehouse フェデレーションには Databricks Runtime 13.3 LTS 以降が必要であるため、Lakehouse フェデレーションを使用するには、プレビュー チャネルを使用するようにパイプラインを構成する必要があります。

一部のデータ ソースでは、SQL で同等のサポートが提供されていません。 これらのデータ ソースのいずれかで Lakehouse フェデレーションを使用できない場合は、スタンドアロンの Python ノートブックを使用してソースからデータを取り込むことができます。 その後、このノートブックを SQL ノートブックを含むソース ライブラリとして追加して、Delta Live Tables パイプラインを構築できます。 次の例では、リモート PostgreSQL テーブル内のデータの現在の状態にアクセスするための具体化されたビューを宣言します:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

クラウド オブジェクト ストレージから小さいデータセットまたは静的データセットを読み込む

Apache Spark の読み込み構文を使用して、小規模または静的なデータセットを読み込むことができます。 Delta Live Tables では、Azure Databricks 上の Apache Spark でサポートされているすべてのファイル形式がサポートされています。 完全な一覧については、「データ形式のオプション」を参照してください。

次の例では、JSON を読み込んで Delta Live Tables テーブルを作成する方法を示します:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH LIVE TABLE clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Note

SELECT * FROM format.`path`; SQL コンストラクトは、Azure Databricks 上のすべての SQL 環境に共通です。 これは、デルタ ライブ テーブルで SQL を使用して直接ファイルにアクセスする場合に推奨されるパターンです。

パイプライン内のシークレットを使用してストレージ資格情報に安全にアクセスする

Azure Databricks のシークレットを使用して、アクセス キーやパスワードなどの資格情報を格納することができます。 パイプラインでシークレットを構成するには、パイプライン設定クラスター構成で Spark プロパティを使用します。 「コンピューティング設定を構成する」をご覧ください。

次の例では、自動ローダーを使用して Azure Data Lake Storage Gen2 (ADLS Gen2) ストレージ アカウントから入力データを読み取るために必要なアクセス キーを格納するために、シークレットを使用します。 この同じ方法を使用して、S3 にアクセスするための AWS キーや Apache Hive メタストアへのパスワードなど、パイプラインで必要などのシークレットでも構成できます。

Azure Data Lake Storage Gen2 の操作の詳細については、「Azure Data Lake Storage Gen2 と Blob Storage に接続する」を参照してください。

Note

シークレット値を設定する spark_conf 構成キーに、spark.hadoop. プレフィックスを追加する必要があります。

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

  • <storage-account-name> は、ADLS Gen2 ストレージ アカウントの名前に置き換えます。
  • <scope-name> は、Azure Databricks シークレット スコープの名前に置き換えます。
  • <secret-name> は、Azure ストレージ アカウントのアクセス キーが含まれるキーの名前に置き換えます。
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

  • <container-name> は、入力データを格納する Azure ストレージ アカウント コンテナーの名前に置き換えます。
  • <storage-account-name> は、ADLS Gen2 ストレージ アカウントの名前に置き換えます。
  • <path-to-input-dataset> は、入力データセットへのパスに置き換えます。

Azure Event Hubs からデータを読み込む

Azure Event Hubs は、Apache Kafka 互換インターフェイスを提供するデータ ストリーミング サービスです。 Delta Live Tables ランタイムに含まれる Structured Streaming Kafka コネクタを使用して、Azure Event Hubs からメッセージを読み込むことができます。 Azure Event Hubs からのメッセージの読み込みと処理の詳細については、「Delta Live Tables データ ソースとしての Azure Event Hubs の使用」を参照してください。