Azure Synapse への構造化ストリーミング書き込み
Azure Synapse コネクタは、バッチ書き込みによって一貫性のあるユーザー エクスペリエンスを提供する Azure Synapse 向けに、効率的でスケーラブルな構造化ストリーミング書き込みサポートを提供します。Azure Databricks クラスターと Azure Synapse インスタンス間の大規模なデータ転送には、COPY
を使用します。
Azure Databricks と Synapse の間の構造化ストリーミング サポートには、増分 ETL ジョブを構成する簡単なセマンティクスが用意されています。 Azure Databricks から Synapse にデータを読み込むモデルでは、ほぼリアルタイムのワークロードの SLA 要件を満たさない可能性がある待機時間が生じます。 Azure Synapse Analytics のクエリ データを参照してください。
Synapse へのストリーミング書き込みでサポートされる出力モード
Azure Synapse では、レコードの追加と集計用に Append
と Complete
の出力モードがサポートされています。 出力モードと互換性マトリックスの詳細については、「構造化ストリーミング ガイド」を参照してください。
Synapse フォールト トレランスのセマンティクス
既定では、Azure Synapse ストリーミングは、Azure Synapse テーブルにデータを書き込むための、エンドツーエンドの厳密に一回の保証を提供します。これを実現するため、DBFS のチェックポイントの場所、Azure Synapse のチェックポイント テーブル、ロック メカニズムを組み合わせ、ストリーミングがあらゆる種類の障害、再試行、およびクエリの再起動を処理できるようにして、クエリの進行状況を確実に追跡します。
必要に応じて、spark.databricks.sqldw.streaming.exactlyOnce.enabled
オプションを false
に設定することにより、Azure Synapse ストリーミングの制限を少なくとも 1 回選択できます。この場合、Azure Synapse への断続的な接続エラーや予期しないクエリの終了が発生した場合に、データが重複する可能性があります。
Azure Synapse に書き込むための構造化ストリーミングの構文
次のコード例は、Scala と Python での構造化ストリーミングを使用した Synapse へのストリーミング書き込みを示しています。
Scala
// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", "100000")
.option("numPartitions", "16")
.load()
// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("checkpointLocation", "/tmp_checkpoint_location")
.start()
Python
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", "100000") \
.option("numPartitions", "16") \
.load()
# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("checkpointLocation", "/tmp_checkpoint_location") \
.start()
すべての構成の一覧については、「Azure Synapse Analytics のクエリ データ」を参照してください。
Synapse ストリーミング チェックポイント テーブルの管理
Azure Synapse コネクタは新しいストリーミング クエリの開始時に作成されるストリーミング チェックポイント テーブルを削除しません。 この動作は、通常オブジェクト ストレージに指定される checkpointLocation
と一致しています。 Databricks では、今後実行されないクエリのチェックポイント テーブルを定期的に削除することをお勧めします。
既定では、すべてのチェックポイント テーブルには <prefix>_<query-id>
という名前が付きます。ここで、<prefix>
は既定値 databricks_streaming_checkpoint
を持つ構成可能なプレフィックスであり、query_id
は _
文字が削除されたストリーミング クエリ ID です。
使用していないストリーミング クエリまたは削除されたストリーミング クエリのすべてのチェックポイント テーブルを検索するには、次のクエリを実行します。
SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'
Spark SQL 構成オプション spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix
を使用して、プレフィックスを構成できます。
Azure Databricks の Synapse コネクタのストリーミング オプション リファレンス
Spark SQL で提供されている OPTIONS
では、バッチ オプションに加え、ストリーミング用に次のオプションをサポートします。
パラメーター | 必須 | Default | メモ |
---|---|---|---|
checkpointLocation |
はい | 既定値なし | 構造化ストリーミングがメタデータとチェックポイント情報を書き込むのに使用する DBFS 上の場所。 構造化ストリーミング プログラミング ガイドの「チェックポイント処理を使用して障害から復旧する」を参照してください。 |
numStreamingTempDirsToKeep |
いいえ | 0 | ストリーミングでマイクロ バッチを定期的にクリーンアップするために保持する (最新の) 一時ディレクトリの数を示します。 0 に設定すると、マイクロ バッチがコミットされた直後にディレクトリの削除がトリガーされます。それ以外の場合は、最新のマイクロ バッチの数が保持され、残りのディレクトリは削除されます。 定期的なクリーンアップを無効にするには、-1 を使用します。 |
注意
checkpointLocation
と numStreamingTempDirsToKeep
は、Azure Databricks から Azure Synapse の新しいテーブルへとストリーミング書き込みを行う場合にのみ使用します。