Delen via


Gestructureerde streamingpatronen in Azure Databricks

Dit bevat notebooks en codevoorbeelden voor veelvoorkomende patronen voor het werken met Structured Streaming in Azure Databricks.

Aan de slag met Gestructureerd streamen

Als u volledig nieuw bent in Structured Streaming, zie Uw eerste workload voor gestructureerd streamen.

Schrijven naar Cassandra als sink voor Gestructureerde streaming in Python

Apache Cassandra is een gedistribueerde, lage latentie, schaalbare, maximaal beschikbare OLTP-database.

Structured Streaming werkt met Cassandra via de Spark Cassandra-connector. Deze connector ondersteunt zowel RDD- als DataFrame-API's en biedt systeemeigen ondersteuning voor het schrijven van streaminggegevens. Belangrijk : u moet de bijbehorende versie van de spark-cassandra-connector-assembly gebruiken.

In het volgende voorbeeld wordt verbinding gemaakt met een of meer hosts in een Cassandra-databasecluster. Ook worden verbindingsconfiguraties opgegeven, zoals de locatie van het controlepunt en de specifieke keyspace- en tabelnamen:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Schrijven naar Azure Synapse Analytics met behulp van foreachBatch() Python

streamingDF.writeStream.foreachBatch() hiermee kunt u bestaande schrijvers van batchgegevens opnieuw gebruiken om de uitvoer van een streamingquery naar Azure Synapse Analytics te schrijven. Zie de foreachBatch-documentatie voor meer informatie.

Als u dit voorbeeld wilt uitvoeren, hebt u de Azure Synapse Analytics-connector nodig. Raadpleeg Query data in Azure Synapse Analytics voor meer informatie over de Azure Synapse Analytics-connector.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Stream-Stream-koppelingen

Deze twee notebooks laten zien hoe u stream-stream joins gebruikt in Python en Scala.

Stream-Stream voegt python-notebook toe

Haal notitieblok op

Stream-Stream wordt gekoppeld aan Scala-notebook

Haal notitieblok op