データは、 フローを介して Lakeflow 宣言パイプラインで処理されます。 各フローは 、クエリ と、通常は ターゲットで構成されます。 このフローは、クエリをバッチとして処理するか、ターゲットへのデータ ストリームとして増分的に処理します。 フローは、Azure Databricks の ETL パイプライン内に存在します。
通常、フローは、ターゲットを更新するクエリを Lakeflow 宣言型パイプラインで作成するときに自動的に定義されますが、複数のソースから 1 つのターゲットに追加するなど、より複雑な処理のために追加のフローを明示的に定義することもできます。
最新情報
フローは、定義パイプラインが更新されるたびに実行されます。 フローでは、使用可能な最新のデータを使用してテーブルが作成または更新されます。 フローの種類とデータ変更の状態に応じて、更新は増分更新を行い、新しいレコードのみを処理するか、完全な更新を行い、データ ソースのすべてのレコードを再処理する場合があります。
- パイプラインの更新の詳細については、「 Lakeflow 宣言型パイプラインで更新を実行する」を参照してください。
- 更新プログラムのスケジュール設定とトリガーの詳細については、「 トリガーされたパイプライン モードと連続パイプライン モード」を参照してください。
既定のフローを作成する
パイプラインで Lakeflow 宣言型パイプライン オブジェクトを作成するときは、通常、テーブルまたはビューをサポートするクエリと共に定義します。 たとえば、この SQL クエリでは、customers_silver
というテーブルから読み取って、customers_bronze
というストリーミング テーブルを作成します。
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
Python でも同じストリーミング テーブルを作成できます。 Python では、通常、Lakeflow 宣言型パイプライン機能にアクセスするためのデコレーターを使用して、データフレームを返すクエリ関数を作成することで、Lakeflow 宣言パイプラインを使用します。
import dlt
@dlt.table()
def customers_silver():
return spark.readStream.table("customers_bronze")
この例では、 ストリーミング テーブルを作成しました。 SQL と Python の両方で、同様の構文を使用して具体化されたビューを作成することもできます。 詳細については、「 ストリーミング テーブル と 具体化されたビュー」を参照してください。
この例では、ストリーミング テーブルと共に既定のフローを作成します。 ストリーミング テーブルの既定のフローは 追加 フローであり、各トリガーで新しい行を追加します。 これは、Lakeflow 宣言パイプラインを使用して、フローとターゲットを 1 つの手順で作成する最も一般的な方法です。 このスタイルを使用して、データを取り込んだり、データを変換したりすることができます。
追加フローでは、1 つのターゲットを更新するために複数のストリーミング ソースからデータを読み取る必要がある処理もサポートされます。 たとえば、既存のストリーミング テーブルとフローがあり、この既存のストリーミング テーブルに書き込む新しいストリーミング ソースを追加したい場合は、追加フロー機能を使用できます。
複数のフローを使用して 1 つのターゲットに書き込む
前の例では、フローとストリーミング テーブルを 1 つの手順で作成しました。 以前に作成したテーブルのフローも作成できます。 この例では、別の手順でテーブルとそのテーブルに関連付けられているフローの作成を確認できます。 このコードの結果は、ストリーミング テーブルとフローに同じ名前を使用するなど、既定のフローを作成する場合と同じです。
パイソン
import dlt
# create streaming table
dlt.create_streaming_table("customers_silver")
# add a flow
@dlt.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;
-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);
ターゲットとは別にフローを作成すると、同じターゲットにデータを追加する複数のフローを作成することもできます。
Python インターフェイスの @append_flow
デコレーター、または SQL インターフェイスの CREATE FLOW...INSERT INTO
句を使用して新しいフローを作成します。たとえば、複数のストリーミング ソースのストリーミング テーブルをターゲットにします。 次のようなタスクを処理するには、追加フローを使います。
- 完全な更新を必要とせずに、既存のストリーミング テーブルにデータを追加するストリーミング ソースを追加します。 たとえば、運用しているすべてのリージョンからのリージョン データを結合するテーブルがあるとします。 新しいリージョンがロールアウトされると、完全な更新を実行せずに、新しいリージョン データをテーブルに追加できます。 既存のストリーミング テーブルにストリーミング ソースを追加する例については、「 例: 複数の Kafka トピックからストリーミング テーブルに書き込む」を参照してください。
- 不足している履歴データ (バックフィル) を追加して、ストリーミング テーブルを更新します。 たとえば、Apache Kafka トピックによって書き込まれる既存のストリーミング テーブルがあるとします。 また、ストリーミング テーブルに 1 回だけ挿入する必要がある履歴データがテーブル内にあり、データを挿入する前の複雑な集計の実行が処理に含まれるため、データをストリーミングすることはできません。 バックフィルの例については、「 例: 1 回限りのデータ バックフィルを実行する」を参照してください。
- クエリで
UNION
句を使う代わりに、複数のソースのデータを結合して、1 つのストリーミング テーブルに書き込みます。UNION
の代わりに追加フロー処理を使うと、最新の情報への完全な更新を実行せずに、ターゲット テーブルを増分的に更新できます。 この方法で行われるユニオンの例については、「例:UNION
の代わりに追加フロー処理を使用する」を参照してください。
追加フロー処理によって出力されるレコードのターゲットは、既存のテーブルまたは新しいテーブルにできます。 Python クエリの場合は、create_streaming_table() 関数を使ってターゲット テーブルを作成します。
次の例では、同じターゲットに対して 2 つのフローを追加し、2 つのソース テーブルの和集合を作成します。
パイソン
import dlt
# create a streaming table
dlt.create_streaming_table("customers_us")
# add the first append flow
@dlt.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")
# add the second append flow
@dlt.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;
-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);
-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);
重要
-
期待値を含むデータ品質制約を定義する必要がある場合は、
create_streaming_table()
関数の一部としてターゲット テーブルで、または既存テーブルの定義で、期待値を定義します。@append_flow
定義で期待値を定義することはできません。 - フローは "フロー名" によって識別され、この名前はストリーミング チェックポイントを識別するために使われます。 チェックポイントの識別にフロー名を使うということは、次のことを意味します。
- パイプライン内の既存のフローの名前を変更した場合、チェックポイントは引き継がれず、名前が変更されたフローは実質的にまったく新しいフローになります。
- 既存のチェックポイントが新しいフロー定義と一致しないため、パイプラインでフロー名を再利用することはできません。
フローの種類
ストリーミング テーブルと具体化されたビューの既定のフローは、追加フローです。 変更データ キャプチャ データ ソースから読み取るフローを作成することもできます。 次の表では、さまざまな種類のフローについて説明します。
フロータイプ | 説明 |
---|---|
追加 |
追加 フローは最も一般的なフローの種類であり、ソース内の新しいレコードは更新ごとにターゲットに書き込まれます。 これらは、構造化ストリーミングの追加モードに対応します。
ONCE フラグを追加して、ターゲットが完全に更新されない限り、データを 1 回だけターゲットに挿入するバッチ クエリを示すことができます。 任意の数の追加フローを特定のターゲットに書き込むことができます。既定のフロー (ターゲット ストリーミング テーブルまたは具体化されたビューで作成) は、ターゲットと同じ名前になります。 他のターゲットには既定のフローがありません。 |
自動 CDC (以前は 変更を適用) |
自動 CDC フローは、変更データ キャプチャ (CDC) データを含むクエリを取り込みます。 自動 CDC フローはストリーミング テーブルのみを対象とすることができ、ソースはストリーミング ソースである必要があります ( ONCE フローの場合でも)。 複数の自動 CDC フローは、1 つのストリーミング テーブルをターゲットにすることができます。 自動 CDC フローのターゲットとして機能するストリーミング テーブルは、他の自動 CDC フローでのみ対象にすることができます。CDC データの詳細については、「 AUTO CDC API: Lakeflow 宣言パイプラインを使用して変更データ キャプチャを簡略化する」を参照してください。 |
追加情報
フローとその使用方法の詳細については、次のトピックを参照してください。