Поделиться через


Запустите первую структурированную потоковую задачу

В этой статье приведены примеры кода и объяснение основных понятий, необходимых для выполнения первых структурированных запросов потоковой передачи в Azure Databricks. Структурированная потоковая передача можно использовать для практически в реальном времени и добавочных рабочих нагрузок обработки.

Структурированная потоковая передача — это одна из нескольких технологий, которые питают таблицы потоковой передачи в Декларативных конвейерах Spark Lakeflow. Databricks рекомендует использовать декларативные конвейеры Lakeflow Spark для всех новых рабочих нагрузок ETL, приема и структурированной потоковой передачи. См. декларативные конвейеры Spark Lakeflow.

Примечание.

Хотя декларативные конвейеры Lakeflow Spark предоставляют немного изменённый синтаксис для объявления потоковых таблиц, общий синтаксис настройки потокового чтения и преобразований применяется ко всем вариантам использования потоковой передачи в Azure Databricks. Декларативные конвейеры Spark Lakeflow также упрощают потоковую передачу, управляя сведениями о состоянии, метаданными и многочисленными конфигурациями.

Использование автозагрузчика для чтения потоковых данных из хранилища объектов

В следующем примере демонстрируется загрузка данных JSON с помощью автозагрузчика, который используется cloudFiles для обозначения формата и параметров. Этот schemaLocation параметр включает вывод схемы и эволюцию. Вставьте следующий код в ячейку ноутбука Databricks и запустите ячейку, чтобы создать потоковый DataFrame с именем raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Как и другие операции чтения в Azure Databricks, настройка потокового чтения фактически не загружает данные. Перед началом потока необходимо активировать действие в данных.

Примечание.

Вызов display() на потоковом DataFrame запускает задание потоковой передачи. В большинстве случаев использования структурированной потоковой передачи действие, активирующее поток, должно записывать данные в хранилище. См. Замечания по производству для структурированной потоковой передачи.

Выполнение потокового преобразования

Структурированная потоковая передача поддерживает большинство преобразований, доступных в Azure Databricks и Spark SQL. Вы можете даже загружать модели MLflow как UDF и выполнять потоковые прогнозы в процессе преобразования.

В следующем примере кода выполняется простое преобразование для обогащения загруженных данных JSON дополнительной информацией с использованием функций Spark SQL.

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

В итоге transformed_df содержатся инструкции по загрузке и преобразованию каждой записи по мере их поступления в источник данных.

Примечание.

Структурированная потоковая передача обрабатывает источники данных как несвязанные или бесконечные наборы данных. Таким образом, некоторые преобразования не поддерживаются в структурированных рабочих нагрузках потоковой передачи, так как для их сортировки требуется бесконечное количество элементов.

Большинство агрегатов и многих соединений требуют управления сведениями о состоянии с подложками, окнами и режимом вывода. См. "Применение водяных знаков для управления порогами обработки данных".

Выполните инкрементальную пакетную запись в Delta Lake

В следующем примере выполняется запись в Delta Lake, используя указанный путь к файлу и контрольную точку.

Внимание

Обязательно укажите уникальное расположение контрольной точки для каждого настраиваемого модуля потоковой передачи. Контрольная точка предоставляет уникальный идентификатор для вашего потока, отслеживая все обработанные записи и информацию о состоянии, связанную с вашим потоковым запросом.

Параметр availableNow триггера указывает структурированной потоковой передаче обработать все ранее пропущенные записи из исходного набора данных и затем завершить работу, чтобы можно было безопасно выполнить следующий код, не опасаясь, что поток останется запущенным.

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

В этом примере новые записи не поступают в наш источник данных, поэтому повторное выполнение этого кода не включает новые записи.

Предупреждение

Структурированное выполнение потоковой передачи может предотвратить автоматическое завершение работы вычислительных ресурсов. Чтобы избежать непредвиденных затрат, обязательно завершите потоковые запросы.

Чтение данных из Delta Lake, преобразование и запись в Delta Lake

Delta Lake имеет обширную поддержку работы со структурированной потоковой передачей как источником, так и приемником. См. потоковые чтения и записи таблиц Delta.

В следующем примере показан синтаксис для добавочной загрузки всех новых записей из таблицы Delta, объединения их с моментальным снимком другой таблицы Delta и их записи в таблицу Delta.

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

У вас должны быть соответствующие разрешения, которые позволяют читать исходные таблицы, записывать в целевые таблицы и работать с указанным расположением контрольной точки. Заполните все параметры, обозначаемые угловыми скобками (<>) с помощью соответствующих значений для источников данных и приемников.

Примечание.

Декларативные конвейеры Lakeflow Spark предоставляют полностью декларативный синтаксис для создания конвейеров Delta Lake и автоматического управления такими свойствами, как триггеры и контрольные точки. См. декларативные конвейеры Spark Lakeflow.

Чтение данных из Kafka, преобразование и запись в Kafka

Apache Kafka и другие системы обмена сообщениями предоставляют одну из самых низких задержек, доступных для больших наборов данных. Azure Databricks можно использовать для применения преобразований к данным, полученным из Kafka, а затем записывать данные обратно в Kafka.

Примечание.

Запись данных в облачное хранилище объектов добавляет дополнительные затраты на задержку. Если вы хотите хранить данные из шины обмена сообщениями в Delta Lake, но требуется наименьшая задержка для потоковых рабочих нагрузок, Databricks рекомендует настроить отдельные задания потоковой передачи для приема данных в lakehouse и применить почти в режиме реального времени преобразования для приемников нисходящей шины обмена сообщениями.

В следующем примере кода показан простой шаблон для обогащения данных из Kafka путем объединения их с данными в таблице Delta, а затем записи обратно в Kafka.

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

У вас должны быть соответствующие разрешения, настроенные для доступа к службе Kafka. Заполните все параметры, обозначаемые угловыми скобками (<>) с помощью соответствующих значений для источников данных и приемников. См. сведения о потоковой обработке с помощью Apache Kafka и Azure Databricks.