Поделиться через


Структурированные шаблоны потоковой передачи в Azure Databricks

Здесь приведены записные книжки и примеры кода для распространенных шаблонов работы со структурированной потоковой передачей в Azure Databricks.

Начало работы со структурированной потоковой передачей

Если вы не знакомы со структурированной потоковой передачей, ознакомьтесь с первой структурированной рабочей нагрузкой потоковой передачи.

Запись в Cassandra в качестве приемника для структурированной потоковой передачи в Python

Apache Cassandra — это распределенная, низкая задержка, масштабируемая, высокодоступная база данных OLTP.

Структурированная потоковая передача работает с Cassandra через соединитель Spark Cassandra. Этот соединитель поддерживает интерфейсы API RDD и DataFrame, а также встроенную поддержку записи потоковых данных. Важно, чтобы использовать соответствующую версию spark-cassandra-connector-assembly.

Следующий пример подключается к одному или нескольким узлам в кластере базы данных Cassandra. Он также указывает конфигурации подключения, такие как расположение контрольной точки и определенные пространства ключей и имена таблиц:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Запись в Azure Synapse Analytics с помощью foreachBatch() Python

streamingDF.writeStream.foreachBatch() позволяет повторно использовать существующие инструменты пакетной записи данных для записи выходных данных запроса на потоковую передачу в Azure Synapse Analytics. Дополнительные сведения см. в документации по foreachBatch.

Для выполнения этого примера требуется соединитель Azure Synapse Analytics. Дополнительные сведения о соединителе Azure Synapse Analytics см. в статье "Запрос данных" в Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Присоединения по типу «поток-поток»

В этих двух записных книжках показано, как использовать присоединения по типу «поток-поток» в Python и Scala.

Присоединение записной книжки Python по типу «поток-поток»

Получить записную книжку

Присоединения записной книжки Scala по типу «поток-поток»

Получить записную книжку