Dela via


Strukturerade strömningsmönster i Azure Databricks

Det här innehåller notebook-filer och kodexempel för vanliga mönster för att arbeta med strukturerad direktuppspelning i Azure Databricks.

Komma igång med strukturerad direktuppspelning

Om du är helt ny på Structured Streaming läser du Kör din första strukturerade strömningsarbetsbelastning.

Skriva till Cassandra som mottagare för strukturerad direktuppspelning i Python

Apache Cassandra är en distribuerad OLTP-databas med låg svarstid, skalbar och hög tillgänglighet.

Strukturerad direktuppspelning fungerar med Cassandra via Spark Cassandra Connector. Den här anslutningsappen stöder både RDD- och DataFrame-API:er och har inbyggt stöd för att skriva strömmande data. Viktigt Du måste använda motsvarande version av spark-cassandra-connector-assembly.

Följande exempel ansluter till en eller flera värdar i ett Cassandra-databaskluster. Den anger även anslutningskonfigurationer, till exempel kontrollpunktsplatsen och de specifika nyckelrymds- och tabellnamnen:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Skriva till Azure Synapse Analytics med hjälp av foreachBatch() i Python

streamingDF.writeStream.foreachBatch() gör att du kan återanvända befintliga batchdataskrivare för att skriva utdata från en strömmande fråga till Azure Synapse Analytics. Mer information finns i foreachBatch-dokumentationen .

Om du vill köra det här exemplet behöver du Azure Synapse Analytics-anslutningsappen. Mer information om Azure Synapse Analytics-anslutningsappen finns i Fråga efter data i Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Anslutningar från dataström till dataström

Dessa två notebook-filer visar hur du använder stream-stream-kopplingar i Python och Scala.

Stream-Stream ansluter till Python Notebook

Hämta notebook-fil

Stream-Stream ansluter till Scala Notebook

Hämta notebook-fil