最初の構造化ストリーミング ワークロードを実行する

この記事では、Azure Databricks で最初の構造化ストリーミング クエリを実行するために必要な基本的な概念のコード例と説明を示します。 構造化ストリーミングは、ほぼリアルタイムおよび増分処理ワークロードに使用できます。

構造化ストリーミングは、Delta Live Tables のストリーミング テーブルの動力であるいくつかのテクノロジの 1 つです。 Databricks では、すべての新しい ETL、インジェスト、および構造化ストリーミング ワークロードにデルタ ライブ テーブルを使用することをお勧めします。 「Delta Live Tables とは」を参照してください。

Note

Delta Live Tables にはストリーミング テーブルを宣言するための若干変更された構文が用意されていますが、ストリーミングの読み取りと変換を構成するための一般的な構文は、Azure Databricks のすべてのストリーミング ユース ケースに適用されます。 Delta Live Tables では、状態情報、メタデータ、および多数の構成を管理することで、ストリーミングも簡略化します。

データ ストリームからの読み取り

構造化ストリームを使用して、サポートされているデータ ソースからデータを増分的に取り込むことができます。 Azure Databricks Structured Streaming ワークロードで使用される最も一般的なデータ ソースには、次のようなものがあります:

  • クラウド オブジェクト ストレージ内のデータ ファイル
  • メッセージ バスとキュー
  • Delta Lake

Databricks では、クラウド オブジェクト ストレージからのファイルのストリーミング インジェストに自動ローダーの使用が推奨されています。 自動ローダーは、構造化ストリーミングでサポートされているほとんどのファイル形式をサポートしています。 「自動ローダー」を参照してください。

各データ ソースには、データのバッチを読み込む方法を指定するためのオプションが多数用意されています。 読み取りの構成中に設定する必要がある主なオプションは、次のカテゴリに分類されます:

  • データ ソースまたは形式を指定するオプション (ファイルの種類、区切り記号、スキーマなど)。
  • ソース システムへのアクセスを構成するオプション (ポート設定や資格情報など)。
  • ストリームの開始位置を指定するオプション (Kafka オフセットや既存のすべてのファイルの読み取りなど)。
  • 各バッチで処理されるデータの量を制御するオプション (バッチあたりの最大オフセット、ファイル、バイト数など)。

自動ローダーを使用してオブジェクト ストレージからストリーミング データを読み取る

次の例では、cloudFiles を使用して形式とオプションを示す、自動ローダーを使用して JSON データを読み込む方法を示します。 schemaLocation オプションを使用すると、スキーマの推論と進化が可能になります。 Databricks ノートブック セルに次のコードを貼り付け、セルを実行して、raw_df という名前のストリーミング DataFrame を作成します:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Azure Databricks での他の読み取り操作と同様に、ストリーミング読み取りを構成しても、実際にはデータは読み込まれません。 ストリームが開始する前に、データに対してアクションをトリガーする必要があります。

Note

ストリーミング DataFrame で display() を呼び出すと、ストリーミング ジョブが開始されます。 ほとんどの構造化ストリーミングのユース ケースでは、ストリームをトリガーするアクションはシンクにデータを書き込む必要があります。 「実稼働構造化ストリーミング コードの準備」を参照してください。

ストリーミング変換を実行する

構造化ストリーミングでは、Azure Databricks と Spark SQL で使用できるほとんどの変換がサポートされています。 MLflow モデルを UDF として読み込み、変換としてストリーミング予測を行うことさえできます。

次のコード例では、Spark SQL 関数を使用して、取り込まれた JSON データを追加情報で強化するための単純な変換を完了します:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

結果の transformed_df には、データ ソースに到着するたびに各レコードを読み込んで変換するクエリ命令が含まれます。

Note

構造化ストリーミングは、データ ソースを無制限または無限のデータセットとして扱います。 そのため、構造化ストリーミング ワークロードでは、無限の数の項目を並べ替える必要があるため、一部の変換はサポートされていません。

ほとんどの集計と多くの結合では、ウォーターマーク、ウィンドウ、出力モードを使用して状態情報を管理する必要があります。 「透かしを適用してデータ処理のしきい値を制御する」を参照してください。

データ シンクへの書き込み

データ シンクは、ストリーミング書き込み操作のターゲットです。 Azure Databricks ストリーミング ワークロードで使用される一般的なシンクには、次のようなものがあります:

  • Delta Lake
  • メッセージ バスとキュー
  • キー値データベース

データ ソースと同様に、ほとんどのデータ シンクには、ターゲット システムへのデータの書き込み方法を制御するためのオプションが多数用意されています。 書き込みの構成中に設定する必要がある主なオプションは、次のカテゴリに分類されます:

  • 出力モード (既定では追加)。
  • チェックポイントの場所 (各 書き込み者 に必要)。
  • トリガー間隔。「構造化ストリーミングのトリガー間隔を構成する」を参照してください。
  • データ シンクまたは形式を指定するオプション (ファイルの種類、区切り記号、スキーマなど)。
  • ターゲット システムへのアクセスを構成するオプション (ポート設定や資格情報など)。

Delta Lake への増分バッチ書き込みを実行する

次の例では、指定したファイル パスとチェックポイントを使用して Delta Lake に書き込みます。

重要

構成するストリーミング ライターごとに、必ず一意のチェックポイントの場所を指定してください。 チェックポイントはストリームの一意の ID を提供し、処理されたすべてのレコードとストリーミング クエリに関連付けられている状態情報を追跡します。

トリガーの availableNow 設定は、ソース データセットから以前に処理されていないすべてのレコードを処理し、シャットダウンするように構造化ストリームに指示します。そのため、ストリームを実行したままにすることなく、次のコードを安全に実行できます:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

この例では、データ ソースに新しいレコードが到着しないため、このコードを繰り返し実行しても新しいレコードは取り込まれません。

警告

構造化ストリーミングの実行により、自動終了によってコンピューティング リソースがシャットダウンされるのを防ぐことができます。 予期しないコストを回避するには、必ずストリーミング クエリを終了してください。

実稼働構造化ストリーミング コードの準備

Databricks では、ほとんどの構造化ストリーミング ワークロードにデルタ ライブ テーブルを使用することをお勧めします。 次の推奨事項は、実稼働用に構造化ストリーミング ワークロードを準備するための開始点を提供します:

  • displaycount など、結果を返すノートブックから不要なコードを削除します。
  • 対話型クラスターで構造化ストリーミング ワークロードを実行しないでください。常にストリームをジョブとしてスケジュールします。
  • ストリーミング ジョブを自動的に回復するには、無限再試行でジョブを構成してください。
  • 構造化ストリーミングを使用するワークロードには自動スケーリングを使用しないでください。

その他の推奨事項については、「構造化ストリーミングの実運用に関する考慮事項」を参照してください。

Delta Lake からデータを読み取り、変換、Delta Lake に書き込む

Delta Lake には、ソースとシンクの両方として構造化ストリーミングを使用するための広範なサポートがあります。 「差分テーブルのストリーミング読み取りと書き込み」を参照してください。

次の例は、Delta テーブルからすべての新しいレコードを増分読み込みし、それらを別の Delta テーブルのスナップショットと結合して、Delta テーブルに書き込む構文の例を示しています:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

ソース テーブルを読み取り、ターゲット テーブルと指定されたチェックポイントの場所に書き込むには、適切なアクセス許可が構成されている必要があります。 データ ソースとシンクに関連する値を使用して、山かっこ (<>) で示されているすべてのパラメーターを入力します。

Note

Delta Live Table は、Delta Lake パイプラインを作成するための完全な宣言構文を提供し、トリガーやチェックポイントなどのプロパティを自動的に管理します。 「Delta Live Tables とは」を参照してください。

Kafka からデータを読み取り、変換、Kafka に書き込む

Apache Kafka やその他のメッセージング バスは、大規模なデータセットで使用可能な待機時間が最も短い一部を提供します。 Azure Databricks を使用して、Kafka から取り込まれたデータに変換を適用し、Kafka にデータを書き戻すことができます。

Note

クラウド オブジェクト ストレージにデータを書き込むと、待機時間のオーバーヘッドが増加します。 Delta Lake にメッセージング バスからのデータを格納するが、ストリーミング ワークロードに可能な限り短い待機時間が必要な場合、Databricks では、データを Lakehouse に取り込み、ダウンストリーム メッセージング バス シンクにほぼリアルタイムの変換を適用するように個別のストリーミング ジョブを構成することをお勧めします。

次のコード例は、Delta テーブル内のデータと結合し、Kafka に書き戻すことによって Kafka からのデータをエンリッチする簡単なパターンを示しています:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Kafka サービスへのアクセス用に適切なアクセス許可が構成されている必要があります。 データ ソースとシンクに関連する値を使用して、山かっこ (<>) で示されているすべてのパラメーターを入力します。 「Apache Kafka と Azure Databricks を使用したストリーム処理」を参照してください。