Padrões de Fluxo Estruturado no Azure Databricks
Isso contém notebooks e códigos de exemplo para padrões comuns para trabalhar com Fluxo Estruturado no Azure Databricks.
Se você for novato no Fluxo Estruturado, confira Executar sua primeira carga de trabalho de Fluxo Estruturado.
O Apache Cassandra é um banco de dados OLTP distribuído, de baixa latência, escalonável e altamente disponível.
O Fluxo Estruturado funciona com o Cassandra por meio do Conector do Cassandra do Spark. Esse conector dá suporte a APIs RDD e DataFrame e tem suporte nativo para gravar dados de streaming. Importante Você deve usar a versão correspondente do spark-cassandra-connector-assembly.
O exemplo a seguir está conectado a um ou mais hosts em um cluster de banco de dados do Cassandra. Ele também especifica configurações de conexão, como o local do ponto de verificação e os nomes de keyspace e tabela específicos:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
streamingDF.writeStream.foreachBatch()
permite reutilizar os gravadores de dados em lotes existentes para gravar a saída de uma consulta de streaming no Azure Synapse Analytics. Confira a documentação do foreachBatch para obter detalhes.
Para executar este exemplo, você precisa do conector do Azure Synapse Analytics. Para obter detalhes sobre o Azure Synapse Analytics, confira Dados de consulta no Azure Synapse Analytics.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Esses dois notebooks mostram como usar junções de fluxo de fluxo em Python e Scala.