次の方法で共有


Lakeflow 宣言パイプラインを使用してデータを読み込む

Lakeflow 宣言型パイプラインを使用して、Azure Databricks 上の Apache Spark でサポートされている任意のデータ ソースからデータを読み込むことができます。 Lakeflow 宣言型パイプラインでは、Spark DataFrame を返す任意のクエリに対して、データセット(テーブルとビュー)を定義できます。これには、ストリーミング DataFrames や Pandas for Spark DataFrames も含まれます。 データ インジェスト タスクの場合、Databricks では、ほとんどのユース ケースでストリーミング テーブルを使用することをお勧めします。 ストリーミング テーブルは、自動ローダーを使用してクラウド オブジェクト ストレージから、または Kafka などのメッセージ バスからデータを取り込む場合に適しています。

  • すべてのデータ ソースで SQL がサポートされているわけではありません。 Lakeflow 宣言パイプラインで SQL ノートブックと Python ノートブックを混在させ、インジェスト以外のすべての操作に SQL を使用できます。
  • 既定で Lakeflow 宣言パイプラインにパッケージ化されていないライブラリの操作の詳細については、「Lakeflow 宣言型パイプライン の Python 依存関係を管理する」を参照してください。
  • Azure Databricks でのインジェストの一般的な情報については、 Lakeflow Connect の Standard コネクタに関するページを参照してください。

次の例は、いくつかの一般的なパターンを示しています。

既存のテーブルからの読み込み

Azure Databricks 内の既存のテーブルからデータを読み込みます。 クエリを使用してデータを変換するか、パイプラインでさらに処理するためにテーブルを読み込むことができます。

次の例では、既存のテーブルからデータを読み取ります。

Python(プログラミング言語)

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

クラウド オブジェクト ストレージからファイルを読み込む

Databricks では、クラウド オブジェクト ストレージまたは Unity カタログ ボリューム内のファイルからのほとんどのデータ インジェスト タスクに対して、Lakeflow 宣言パイプラインで自動ローダーを使用することをお勧めします。 自動ローダーとLakeflow宣言型パイプラインは、クラウドストレージに到着する増加し続けるデータを、増分的かつ冪等性を持って読み込むように設計されています。

「自動ローダーとは」を参照し、オブジェクト ストレージからデータを読み込みます

次の例では、自動ローダーを使用してクラウド ストレージからデータを読み取ります。

Python(プログラミング言語)

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

次の例では、自動ローダーを使用して、Unity カタログ ボリューム内の CSV ファイルからデータセットを作成します。

Python(プログラミング言語)

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

  • ファイル通知で自動ローダーを使用し、パイプラインまたはストリーミング テーブルの完全な更新を実行する場合は、リソースを手動でクリーンアップする必要があります。 ノートブックで CloudFilesResourceManager を使用してクリーンアップを実行できます。
  • Unity Catalog が有効になったパイプラインで自動ローダーを使用してファイルを読み込むには、外部の場所を使用する必要があります。 Lakeflow 宣言パイプラインで Unity カタログを使用する方法の詳細については、「Lakeflow 宣言型パイプライン で Unity カタログを使用する」を参照してください。

メッセージ バスからデータを読み込む

メッセージ バスからデータを取り込むよう Lakeflow 宣言パイプラインを構成できます。 Databricks では、メッセージ バスからの待機時間の短い読み込みに最も効率的なインジェストを提供するために、継続的な実行と拡張された自動スケーリングでストリーミング テーブルを使用することをお勧めします。 自動スケーリングを使用した Lakeflow 宣言パイプラインのクラスター使用率の最適化に関するページを参照してください。

たとえば、次のコードは、 read_kafka 関数を使用して Kafka からデータを取り込むためのストリーミング テーブルを構成します。

Python(プログラミング言語)

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

他のメッセージ バス ソースから取り込むには、次を参照してください。

Azure Event Hubs からデータを読み込む

Azure Event Hubs は、Apache Kafka 互換インターフェイスを提供するデータ ストリーミング サービスです。 Lakeflow 宣言パイプライン ランタイムに含まれる Structured Streaming Kafka コネクタを使用して、Azure Event Hubs からメッセージを読み込むことができます。 Azure Event Hubs からのメッセージの読み込みと処理の詳細については、「 Azure Event Hubs を Lakeflow 宣言パイプライン データ ソースとして使用する」を参照してください。

外部システムからデータを読み込む

Lakeflow 宣言パイプラインでは、Azure Databricks でサポートされている任意のデータ ソースからのデータの読み込みがサポートされています。 データ ソースと外部サービスへの接続を参照してください。 サポートされているデータ ソースに対して Lakehouse フェデレーションを使用して外部データを読み込むこともできます。 Lakehouse Federation では Databricks Runtime 13.3 LTS 以上が必要であるため、Lakehouse フェデレーションを使用するには、 プレビュー チャネルを使用するようにパイプラインを構成する必要があります。

一部のデータ ソースでは、SQL で同等のサポートが提供されていません。 これらのデータ ソースのいずれかで Lakehouse Federation を使用できない場合は、Python ノートブックを使用してソースからデータを取り込むことができます。 Python と SQL のソース コードを同じパイプラインに追加できます。 次の例では、リモート PostgreSQL テーブル内のデータの現在の状態にアクセスするための具体化されたビューを宣言します。

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

クラウド オブジェクト ストレージから小さいデータセットまたは静的データセットを読み込む

Apache Spark の読み込み構文を使用して、小規模または静的なデータセットを読み込むことができます。 Lakeflow 宣言型パイプラインでは、Azure Databricks 上の Apache Spark でサポートされているすべてのファイル形式がサポートされています。 完全な一覧については、「 データ形式のオプション」を参照してください。

次の例では、JSON を読み込んで Lakeflow 宣言パイプライン テーブルを作成する方法を示します。

Python(プログラミング言語)

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

read_files SQL 関数は、Azure Databricks 上のすべての SQL 環境に共通です。 これは、Lakeflow 宣言型パイプラインで SQL を使用してファイルに直接アクセスするための推奨パターンです。 詳細については、「オプションの」を参照してください。

ソース ストリーミング テーブルの変更を無視するようにストリーミング テーブルを構成する

  • skipChangeCommits フラグは、spark.readStream 関数を使用する option() でのみ動作します。 このフラグは、dlt.read_stream() 関数で使用することはできません。
  • ソース ストリーミング テーブルが create_auto_cdc_flowskipChangeCommits 関数のターゲットとして定義されている場合、 フラグを使用することはできません。

既定では、ストリーミング テーブルには追加専用のソースが必要です。 あるストリーミング テーブルで、別のストリーミング テーブルがソースとして使用されており、そのソース ストリーミング テーブルに更新または削除が必要な場合、たとえば、GDPR の "忘れられる権利" 処理では、それらの変更を無視するために、ソース ストリーミング テーブルの読み込み時に skipChangeCommits フラグを設定することができます。 このフラグの詳細については、「更新と削除を無視する」を参照してください。

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

パイプライン内のシークレットを使用してストレージ資格情報に安全にアクセスする

Azure Databricks シークレットを使用して アクセス キーやパスワードなどの資格情報を格納できます。 パイプラインでシークレットを構成するには、パイプライン設定クラスター構成で Spark プロパティを使用します。 「Lakeflow 宣言パイプラインのコンピューティングを構成する」を参照してください。

次の例では、シークレットを使用して、 自動ローダーを使用して Azure Data Lake Storage (ADLS) ストレージ アカウントから入力データを読み取るために必要なアクセス キーを格納します。 この同じ方法を使用して、パイプラインに必要なシークレット (たとえば、S3 にアクセスするための AWS キー、Apache Hive メタストアへのパスワードなど) を構成できます。

Azure Data Lake Storage の操作の詳細については、Azure Data Lake Storage と Blob Storage への接続に関するページを参照してください。

シークレット値を設定するspark.hadoop.構成キーに、spark_conf プレフィックスを追加する必要があります。

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

置換

  • <storage-account-name> を ADLS ストレージ アカウント名と共に使用します。
  • <scope-name> を Azure Databricks のシークレットスコープ名で置き換えてください。
  • <secret-name> は、Azure ストレージ アカウントのアクセス キーが含まれるキーの名前に置き換えます。
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

置換

  • <container-name> 入力データを格納する Azure ストレージ アカウント コンテナーの名前を指定します。
  • <storage-account-name> を ADLS ストレージ アカウント名と共に使用します。
  • <path-to-input-dataset> 入力データセットへのパスを指定します。