次の方法で共有


Lakeflow 宣言パイプライン フローを使用してデータを増分的に読み込んで処理する

データは、 フローを介して Lakeflow 宣言パイプラインで処理されます。 各フローは 、クエリ と、通常は ターゲットで構成されます。 このフローは、クエリをバッチとして処理するか、ターゲットへのデータ ストリームとして増分的に処理します。 フローは、Azure Databricks の ETL パイプライン内に存在します。

通常、フローは、ターゲットを更新するクエリを Lakeflow 宣言型パイプラインで作成するときに自動的に定義されますが、複数のソースから 1 つのターゲットに追加するなど、より複雑な処理のために追加のフローを明示的に定義することもできます。

最新情報

フローは、定義パイプラインが更新されるたびに実行されます。 フローでは、使用可能な最新のデータを使用してテーブルが作成または更新されます。 フローの種類とデータ変更の状態に応じて、更新は増分更新を行い、新しいレコードのみを処理するか、完全な更新を行い、データ ソースのすべてのレコードを再処理する場合があります。

既定のフローを作成する

パイプラインで Lakeflow 宣言型パイプライン オブジェクトを作成するときは、通常、テーブルまたはビューをサポートするクエリと共に定義します。 たとえば、この SQL クエリでは、customers_silver というテーブルから読み取って、customers_bronzeというストリーミング テーブルを作成します。

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Python でも同じストリーミング テーブルを作成できます。 Python では、通常、Lakeflow 宣言型パイプライン機能にアクセスするためのデコレーターを使用して、データフレームを返すクエリ関数を作成することで、Lakeflow 宣言パイプラインを使用します。

import dlt

@dlt.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

この例では、 ストリーミング テーブルを作成しました。 SQL と Python の両方で、同様の構文を使用して具体化されたビューを作成することもできます。 詳細については、「 ストリーミング テーブル具体化されたビュー」を参照してください。

この例では、ストリーミング テーブルと共に既定のフローを作成します。 ストリーミング テーブルの既定のフローは 追加 フローであり、各トリガーで新しい行を追加します。 これは、Lakeflow 宣言パイプラインを使用して、フローとターゲットを 1 つの手順で作成する最も一般的な方法です。 このスタイルを使用して、データを取り込んだり、データを変換したりすることができます。

追加フローでは、1 つのターゲットを更新するために複数のストリーミング ソースからデータを読み取る必要がある処理もサポートされます。 たとえば、既存のストリーミング テーブルとフローがあり、この既存のストリーミング テーブルに書き込む新しいストリーミング ソースを追加したい場合は、追加フロー機能を使用できます。

複数のフローを使用して 1 つのターゲットに書き込む

前の例では、フローとストリーミング テーブルを 1 つの手順で作成しました。 以前に作成したテーブルのフローも作成できます。 この例では、別の手順でテーブルとそのテーブルに関連付けられているフローの作成を確認できます。 このコードの結果は、ストリーミング テーブルとフローに同じ名前を使用するなど、既定のフローを作成する場合と同じです。

パイソン

import dlt

# create streaming table
dlt.create_streaming_table("customers_silver")

# add a flow
@dlt.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

ターゲットとは別にフローを作成すると、同じターゲットにデータを追加する複数のフローを作成することもできます。

Python インターフェイスの @append_flow デコレーター、または SQL インターフェイスの CREATE FLOW...INSERT INTO 句を使用して新しいフローを作成します。たとえば、複数のストリーミング ソースのストリーミング テーブルをターゲットにします。 次のようなタスクを処理するには、追加フローを使います。

  • 完全な更新を必要とせずに、既存のストリーミング テーブルにデータを追加するストリーミング ソースを追加します。 たとえば、運用しているすべてのリージョンからのリージョン データを結合するテーブルがあるとします。 新しいリージョンがロールアウトされると、完全な更新を実行せずに、新しいリージョン データをテーブルに追加できます。 既存のストリーミング テーブルにストリーミング ソースを追加する例については、「 例: 複数の Kafka トピックからストリーミング テーブルに書き込む」を参照してください。
  • 不足している履歴データ (バックフィル) を追加して、ストリーミング テーブルを更新します。 たとえば、Apache Kafka トピックによって書き込まれる既存のストリーミング テーブルがあるとします。 また、ストリーミング テーブルに 1 回だけ挿入する必要がある履歴データがテーブル内にあり、データを挿入する前の複雑な集計の実行が処理に含まれるため、データをストリーミングすることはできません。 バックフィルの例については、「 例: 1 回限りのデータ バックフィルを実行する」を参照してください。
  • クエリで UNION 句を使う代わりに、複数のソースのデータを結合して、1 つのストリーミング テーブルに書き込みます。 UNION の代わりに追加フロー処理を使うと、最新の情報への完全な更新を実行せずに、ターゲット テーブルを増分的に更新できます。 この方法で行われるユニオンの例については、「例: UNION の代わりに追加フロー処理を使用する」を参照してください。

追加フロー処理によって出力されるレコードのターゲットは、既存のテーブルまたは新しいテーブルにできます。 Python クエリの場合は、create_streaming_table() 関数を使ってターゲット テーブルを作成します。

次の例では、同じターゲットに対して 2 つのフローを追加し、2 つのソース テーブルの和集合を作成します。

パイソン

import dlt

# create a streaming table
dlt.create_streaming_table("customers_us")

# add the first append flow
@dlt.append_flow(target = "customers_us")
def append1():
  return spark.readStream.table("customers_us_west")

# add the second append flow
@dlt.append_flow(target = "customers_us")
def append2():
  return spark.readStream.table("customers_us_east")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;

-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);

-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);

重要

  • 期待値を含むデータ品質制約を定義する必要がある場合は、create_streaming_table() 関数の一部としてターゲット テーブルで、または既存テーブルの定義で、期待値を定義します。 @append_flow 定義で期待値を定義することはできません。
  • フローは "フロー名" によって識別され、この名前はストリーミング チェックポイントを識別するために使われます。 チェックポイントの識別にフロー名を使うということは、次のことを意味します。
    • パイプライン内の既存のフローの名前を変更した場合、チェックポイントは引き継がれず、名前が変更されたフローは実質的にまったく新しいフローになります。
    • 既存のチェックポイントが新しいフロー定義と一致しないため、パイプラインでフロー名を再利用することはできません。

フローの種類

ストリーミング テーブルと具体化されたビューの既定のフローは、追加フローです。 変更データ キャプチャ データ ソースから読み取るフローを作成することもできます。 次の表では、さまざまな種類のフローについて説明します。

フロータイプ 説明
追加 追加 フローは最も一般的なフローの種類であり、ソース内の新しいレコードは更新ごとにターゲットに書き込まれます。 これらは、構造化ストリーミングの追加モードに対応します。 ONCE フラグを追加して、ターゲットが完全に更新されない限り、データを 1 回だけターゲットに挿入するバッチ クエリを示すことができます。 任意の数の追加フローを特定のターゲットに書き込むことができます。
既定のフロー (ターゲット ストリーミング テーブルまたは具体化されたビューで作成) は、ターゲットと同じ名前になります。 他のターゲットには既定のフローがありません。
自動 CDC (以前は 変更を適用) 自動 CDC フローは、変更データ キャプチャ (CDC) データを含むクエリを取り込みます。 自動 CDC フローはストリーミング テーブルのみを対象とすることができ、ソースはストリーミング ソースである必要があります ( ONCE フローの場合でも)。 複数の自動 CDC フローは、1 つのストリーミング テーブルをターゲットにすることができます。 自動 CDC フローのターゲットとして機能するストリーミング テーブルは、他の自動 CDC フローでのみ対象にすることができます。
CDC データの詳細については、「 AUTO CDC API: Lakeflow 宣言パイプラインを使用して変更データ キャプチャを簡略化する」を参照してください。

追加情報

フローとその使用方法の詳細については、次のトピックを参照してください。