Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Dit bevat notebooks en codevoorbeelden voor veelvoorkomende patronen voor het werken met Structured Streaming in Azure Databricks.
Aan de slag met Gestructureerd streamen
Als u volledig nieuw bent in Structured Streaming, zie Uw eerste workload voor gestructureerd streamen.
Schrijven naar Cassandra als sink voor Gestructureerde streaming in Python
Apache Cassandra is een gedistribueerde, lage latentie, schaalbare, maximaal beschikbare OLTP-database.
Structured Streaming werkt met Cassandra via de Spark Cassandra-connector. Deze connector ondersteunt zowel RDD- als DataFrame-API's en biedt systeemeigen ondersteuning voor het schrijven van streaminggegevens. Belangrijk : u moet de bijbehorende versie van de spark-cassandra-connector-assembly gebruiken.
In het volgende voorbeeld wordt verbinding gemaakt met een of meer hosts in een Cassandra-databasecluster. Ook worden verbindingsconfiguraties opgegeven, zoals de locatie van het controlepunt en de specifieke keyspace- en tabelnamen:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
Schrijven naar Azure Synapse Analytics met behulp van foreachBatch()
Python
streamingDF.writeStream.foreachBatch()
hiermee kunt u bestaande schrijvers van batchgegevens opnieuw gebruiken om de uitvoer van een streamingquery naar Azure Synapse Analytics te schrijven. Zie de foreachBatch-documentatie voor meer informatie.
Als u dit voorbeeld wilt uitvoeren, hebt u de Azure Synapse Analytics-connector nodig. Raadpleeg Query data in Azure Synapse Analytics voor meer informatie over de Azure Synapse Analytics-connector.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Stream-Stream-koppelingen
Deze twee notebooks laten zien hoe u stream-stream joins gebruikt in Python en Scala.