Share via


Delta Live Tables フローを使用してデータを増分的に読み込んで処理する

この記事では、フローとは何か、および Delta Live Tables パイプラインでフローを使って、ソースからターゲット ストリーミング テーブルへのデータを増分的に処理する方法について説明します。 Delta Live Tables でのフローは、2 つの方法で定義されます。

  1. ユーザーがストリーミング テーブルを更新するクエリを作成すると、フローが自動的に定義されます。
  2. Delta Live Tables には、複数のストリーミング ソースからのストリーミング テーブルへの追加など、より複雑な処理のためのフローを明示的に定義する機能も用意されています。

この記事では、ユーザーがストリーミング テーブルを更新するためのクエリを定義すると作成される暗黙的なフローについて説明した後、さらに複雑なフローを定義するための構文について詳しく説明します。

フローとは

Delta Live Tables での "フロー" は、ソース データを増分的に処理してターゲット ストリーミング テーブルを更新するストリーミング クエリです。 ユーザーがパイプラインで作成するほとんどの Delta Live Tables データセットでは、クエリの一部としてフローが定義されるので、フローを明示的に定義する必要はありません。 たとえば、テーブルとフローのステートメントを個別に使ってストリーミング テーブルを作成する代わりに、1 つの DDL コマンドで Delta Live Tables にストリーミング テーブルを作成します。

Note

この CREATE FLOW の例はあくまでも説明のためのものであり、有効な Delta Live Tables 構文ではないキーワードが含まれています。

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

クエリによって定義される既定のフローに加えて、Delta Live Tables の Python と SQL のインターフェイスには、"追加フロー" の機能があります。 追加フローでは、1 つのストリーミング テーブルを更新するために複数のストリーミング ソースからデータを読み取る必要がある処理がサポートされています。 たとえば、既存のストリーミング テーブルとフローがあり、この既存のストリーミング テーブルに書き込む新しいストリーミング ソースを追加したい場合は、追加フロー機能を使用できます。

追加フローを使用して、複数のソース ストリームから 1 つのストリーミング テーブルに書き込む

Note

フローの追加処理を使用するには、プレビュー チャネルを使用するようにパイプラインを構成する必要があります。

複数のストリーミング ソースから 1 つのストリーミング テーブルに書き込むには、Python インターフェイスの @append_flow デコレーターまたは SQL インターフェイスの CREATE FLOW 句を使います。 次のようなタスクを処理するには、追加フローを使います。

  • 完全な更新を必要とせずに、既存のストリーミング テーブルにデータを追加するストリーミング ソースを追加します。 たとえば、運用しているすべてのリージョンからのリージョン データを結合するテーブルがあるとします。 新しいリージョンがロールアウトされると、完全な更新を実行せずに、新しいリージョン データをテーブルに追加できます。 「例: 複数の Kafka トピックからストリーミング テーブルに書き込む」をご覧ください。
  • 不足している履歴データ (バックフィル) を追加して、ストリーミング テーブルを更新します。 たとえば、Apache Kafka トピックによって書き込まれる既存のストリーミング テーブルがあるとします。 また、ストリーミング テーブルに 1 回だけ挿入する必要がある履歴データがテーブル内にあり、データを挿入する前の複雑な集計の実行が処理に含まれるため、データをストリーミングすることはできません。 「例: データのバックフィルを 1 回だけ実行する」をご覧ください。
  • クエリで UNION 句を使う代わりに、複数のソースのデータを結合して、1 つのストリーミング テーブルに書き込みます。 UNION の代わりに追加フロー処理を使うと、最新の情報への完全な更新を実行せずに、ターゲット テーブルを増分的に更新できます。 「例: UNION の代わりに追加フロー処理を使用する」をご覧ください。

追加フロー処理によって出力されるレコードのターゲットは、既存のテーブルまたは新しいテーブルにできます。 Python クエリの場合は、create_streaming_table() 関数を使ってターゲット テーブルを作成します。

重要

  • 期待値を含むデータ品質制約を定義する必要がある場合は、create_streaming_table() 関数の一部としてターゲット テーブルで、または既存テーブルの定義で、期待値を定義します。 @append_flow 定義で期待値を定義することはできません。
  • フローは "フロー名" によって識別され、この名前はストリーミング チェックポイントを識別するために使われます。 チェックポイントの識別にフロー名を使うということは、次のことを意味します。
    • パイプライン内の既存のフローの名前を変更した場合、チェックポイントは引き継がれず、名前が変更されたフローは実質的にまったく新しいフローになります。
    • 既存のチェックポイントが新しいフロー定義と一致しないため、パイプラインでフロー名を再利用することはできません。

@append_flow の構文を次に示します:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

例: 複数の Kafka トピックからストリーミング テーブルに書き込む

次の例では、kafka_target という名前のストリーミング テーブルを作成し、2 つの Kafka トピックからそのストリーミング テーブルに書き込みます。

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

SQL クエリで使われる read_kafka() テーブル値関数について詳しくは、SQL 言語リファレンスの read_kafka に関する記事をご覧ください。

例: データのバックフィルを 1 回だけ実行する

次の例では、クエリを実行して履歴データをストリーミング テーブルに追加します。

Note

バックフィル クエリがスケジュールに基づいて、または継続的に実行されるパイプラインの一部である場合に、真の 1 回限りのバックフィルを確保するには、パイプラインを 1 回実行した後にクエリを削除します。 バックフィル ディレクトリに到着した場合に新しいデータを追加するには、クエリをそのまま使用します。

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  cloud_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  cloud_files(
    "path/to/backfill/data/dir",
    "csv"
  );

例: UNION の代わりに追加フロー処理を使用する

UNION 句を含むクエリを使う代わりに、追加フロー クエリを使って複数のソースを結合し、1 つのストリーミング テーブルに書き込むことができます。 UNION の代わりに追加フロー クエリを使うと、完全な更新を実行しなくても、複数のソースからストリーミング テーブルに追加できます。

次の Python の例には、UNION 句を使って複数のデータ ソースを結合するクエリが含まれます。

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

次の例では、追加フロー クエリで UNION クエリを置き換えています。

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/apac",
    "csv"
  );