Share via


Modèles de flux structuré sur Azure Databricks

Cette rubrique contient des notebooks et exemples de code pour des modèles courants d’utilisation de Structured Streaming sur Azure Databricks.

Prise en main de Structured Streaming

Si vous débutez avec Structured Streaming, consultez Exécuter votre première charge de travail Structured Streaming.

Écrire dans Cassandra en tant que récepteur pour le flux structuré dans Python

Apache Cassandra est une base de données OLTP distribuée, à faible latence, évolutive et hautement disponible.

Le flux structuré fonctionne avec Cassandra via le connecteur Spark Cassandra. Ce connecteur prend en charge à la fois les API RDD et DataFrame, et dispose d’une prise en charge native pour l’écriture de données en continu. Important Vous devez utiliser la version correspondante de spark-cassandra-connector-assembly.

L’exemple suivant se connecte à un ou plusieurs hôtes dans un cluster de base de données Cassandra. Il spécifie également des configurations de connexion telles que l’emplacement du point de contrôle et les noms de table et d’espace de clés spécifiques :

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Écrire dans Azure Synapse Analytics avec foreachBatch() en Python

streamingDF.writeStream.foreachBatch() vous permet de réutiliser les rédacteurs de données par lots existants pour écrire la sortie d'une requête en continu dans Azure Synapse Analytics. Consultez la documentation foreachBatch pour plus d'informations.

Pour exécuter cet exemple, vous avez besoin du connecteur Azure Synapse Analytics. Pour plus d’informations sur le connecteur Azure Synapse Analytics, consultez les données de requête dans Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Jointures flux-flux

Ces deux blocs-notes montrent comment utiliser des jointures de flux de flux dans Python et Scala.

Stream-Stream rejoint le bloc-notes python

Obtenir le notebook

Stream-Stream le bloc-notes Scala

Obtenir le notebook