Azure Databricks 上的結構化串流模式
這包括筆記本和程式碼範例,適用于在 Azure Databricks 上使用結構化串流的常見模式。
開始使用結構化串流
如果您不熟悉結構化串流,請參閱 執行您的第一個結構化串流工作負載 。
寫入 Cassandra 作為 Python 中結構化串流的接收
Apache Cassandra 是分散式、低延遲、可調整、高可用性的 OLTP 資料庫。
結構化串流可透過 Spark Cassandra 連線or 與 Cassandra 搭配運作。 此連接器同時支援 RDD 和 DataFrame API,而且其原生支援寫入串流資料。 重要 您必須使用對應版本的 spark-cassandra-connector-assembly 。
下列範例會連線到 Cassandra 資料庫叢集中的一或多個主機。 它也會指定連接組態,例如檢查點位置,以及特定的索引鍵空間和資料表名稱:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
在 Python 中使用 foreachBatch()
寫入 Azure Synapse Analytics
streamingDF.writeStream.foreachBatch()
可讓您重複使用現有的批次資料寫入器,將串流查詢的輸出寫入 Azure Synapse Analytics。 如需詳細資訊,請參閱 foreachBatch 檔 。
若要執行此範例,您需要 Azure Synapse Analytics 連接器。 如需 Azure Synapse Analytics 連接器的詳細資訊,請參閱 在 Azure Synapse Analytics 中查詢資料。
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Stream-Stream 聯結
這兩個筆記本示範如何在 Python 和 Scala 中使用串流聯結。
Stream-Stream 聯結 Python 筆記本
Stream-Stream 聯結 Scala 筆記本
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應