Azure Synapse への構造化ストリーミング書き込み

Azure Synapse コネクタは、バッチ書き込みによって一貫性のあるユーザー エクスペリエンスを提供する Azure Synapse 向けに、効率的でスケーラブルな構造化ストリーミング書き込みサポートを提供します。Azure Databricks クラスターと Azure Synapse インスタンス間の大規模なデータ転送には、COPY を使用します。

Azure Databricks と Synapse の間の構造化ストリーミング サポートには、増分 ETL ジョブを構成する簡単なセマンティクスが用意されています。 Azure Databricks から Synapse にデータを読み込むモデルでは、ほぼリアルタイムのワークロードの SLA 要件を満たさない可能性がある待機時間が生じます。 Azure Synapse Analytics のクエリ データを参照してください。

Synapse へのストリーミング書き込みでサポートされる出力モード

Azure Synapse では、レコードの追加と集計用に AppendComplete の出力モードがサポートされています。 出力モードと互換性マトリックスの詳細については、「構造化ストリーミング ガイド」を参照してください。

Synapse フォールト トレランスのセマンティクス

既定では、Azure Synapse ストリーミングは、Azure Synapse テーブルにデータを書き込むための、エンドツーエンドの厳密に一回の保証を提供します。これを実現するため、DBFS のチェックポイントの場所、Azure Synapse のチェックポイント テーブル、ロック メカニズムを組み合わせ、ストリーミングがあらゆる種類の障害、再試行、およびクエリの再起動を処理できるようにして、クエリの進行状況を確実に追跡します。

必要に応じて、spark.databricks.sqldw.streaming.exactlyOnce.enabled オプションを false に設定することにより、Azure Synapse ストリーミングの制限を少なくとも 1 回選択できます。この場合、Azure Synapse への断続的な接続エラーや予期しないクエリの終了が発生した場合に、データが重複する可能性があります。

Azure Synapse に書き込むための構造化ストリーミングの構文

次のコード例は、Scala と Python での構造化ストリーミングを使用した Synapse へのストリーミング書き込みを示しています。

Scala

// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "100000")
  .option("numPartitions", "16")
  .load()

// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("checkpointLocation", "/tmp_checkpoint_location")
  .start()

Python

# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
  .format("rate") \
  .option("rowsPerSecond", "100000") \
  .option("numPartitions", "16") \
  .load()

# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("checkpointLocation", "/tmp_checkpoint_location") \
  .start()

すべての構成の一覧については、「Azure Synapse Analytics のクエリ データ」を参照してください。

Synapse ストリーミング チェックポイント テーブルの管理

Azure Synapse コネクタは新しいストリーミング クエリの開始時に作成されるストリーミング チェックポイント テーブルを削除しません。 この動作は、通常オブジェクト ストレージに指定される checkpointLocation と一致しています。 Databricks では、今後実行されないクエリのチェックポイント テーブルを定期的に削除することをお勧めします。

既定では、すべてのチェックポイント テーブルには <prefix>_<query-id> という名前が付きます。ここで、<prefix> は既定値 databricks_streaming_checkpoint を持つ構成可能なプレフィックスであり、query_id_ 文字が削除されたストリーミング クエリ ID です。

使用していないストリーミング クエリまたは削除されたストリーミング クエリのすべてのチェックポイント テーブルを検索するには、次のクエリを実行します。

SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'

Spark SQL 構成オプション spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix を使用して、プレフィックスを構成できます。

Azure Databricks の Synapse コネクタのストリーミング オプション リファレンス

Spark SQL で提供されている OPTIONS では、バッチ オプションに加え、ストリーミング用に次のオプションをサポートします。

パラメーター 必須 Default メモ
checkpointLocation はい 既定値なし 構造化ストリーミングがメタデータとチェックポイント情報を書き込むのに使用する DBFS 上の場所。 構造化ストリーミング プログラミング ガイドの「チェックポイント処理を使用して障害から復旧する」を参照してください。
numStreamingTempDirsToKeep いいえ 0 ストリーミングでマイクロ バッチを定期的にクリーンアップするために保持する (最新の) 一時ディレクトリの数を示します。 0 に設定すると、マイクロ バッチがコミットされた直後にディレクトリの削除がトリガーされます。それ以外の場合は、最新のマイクロ バッチの数が保持され、残りのディレクトリは削除されます。 定期的なクリーンアップを無効にするには、-1 を使用します。

注意

checkpointLocationnumStreamingTempDirsToKeep は、Azure Databricks から Azure Synapse の新しいテーブルへとストリーミング書き込みを行う場合にのみ使用します。