共用方式為


Azure Databricks 上的結構化串流模式

這包括筆記本和程式碼範例,適用于在 Azure Databricks 上使用結構化串流的常見模式。

開始使用結構化串流

如果您不熟悉結構化串流,請參閱 執行您的第一個結構化串流工作負載

寫入 Cassandra 作為 Python 中結構化串流的接收

Apache Cassandra 是分散式、低延遲、可調整、高可用性的 OLTP 資料庫。

結構化串流可透過 Spark Cassandra 連線or 與 Cassandra 搭配運作。 此連接器同時支援 RDD 和 DataFrame API,而且其原生支援寫入串流資料。 重要 您必須使用對應版本的 spark-cassandra-connector-assembly

下列範例會連線到 Cassandra 資料庫叢集中的一或多個主機。 它也會指定連接組態,例如檢查點位置,以及特定的索引鍵空間和資料表名稱:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

在 Python 中使用 foreachBatch() 寫入 Azure Synapse Analytics

streamingDF.writeStream.foreachBatch() 可讓您重複使用現有的批次資料寫入器,將串流查詢的輸出寫入 Azure Synapse Analytics。 如需詳細資訊,請參閱 foreachBatch 檔

若要執行此範例,您需要 Azure Synapse Analytics 連接器。 如需 Azure Synapse Analytics 連接器的詳細資訊,請參閱 在 Azure Synapse Analytics 中查詢資料。

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Stream-Stream 聯結

這兩個筆記本示範如何在 Python 和 Scala 中使用串流聯結。

Stream-Stream 聯結 Python 筆記本

取得筆記本

Stream-Stream 聯結 Scala 筆記本

取得筆記本