Gestructureerde streamingpatronen in Azure Databricks

Dit bevat notebooks en codevoorbeelden voor veelvoorkomende patronen voor het werken met Structured Streaming in Azure Databricks.

Aan de slag met Gestructureerd streamen

Als u nieuw bent voor Structured Streaming, raadpleegt u Uw eerste workload voor gestructureerd streamen uitvoeren.

Schrijven naar Cassandra als sink voor Gestructureerd streamen in Python

Apache Cassandra is een gedistribueerde, lage latentie, schaalbare, maximaal beschikbare OLTP-database.

Structured Streaming werkt met Cassandra via de Spark Cassandra-Verbinding maken or. Deze connector ondersteunt zowel RDD- als DataFrame-API's en biedt systeemeigen ondersteuning voor het schrijven van streaminggegevens. Belangrijk : u moet de bijbehorende versie van de spark-cassandra-connector-assembly gebruiken.

In het volgende voorbeeld wordt verbinding gemaakt met een of meer hosts in een Cassandra-databasecluster. Ook worden verbindingsconfiguraties opgegeven, zoals de locatie van het controlepunt en de specifieke keyspace- en tabelnamen:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Schrijven naar Azure Synapse Analytics met behulp van foreachBatch() Python

streamingDF.writeStream.foreachBatch() hiermee kunt u bestaande schrijvers van batchgegevens opnieuw gebruiken om de uitvoer van een streamingquery naar Azure Synapse Analytics te schrijven. Zie de foreachBatch-documentatie voor meer informatie.

Als u dit voorbeeld wilt uitvoeren, hebt u de Azure Synapse Analytics-connector nodig. Zie Querygegevens in Azure Synapse Analytics voor meer informatie over de Azure Synapse Analytics-connector.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Stream-stream-joins

Deze twee notebooks laten zien hoe u stream-stream joins gebruikt in Python en Scala.

Stream-Stream voegt python-notebook toe

Notebook downloaden

Scala-notebook koppelen aan Stream-Stream

Notebook downloaden