Запуск первой структурированной рабочей нагрузки потоковой передачи
В этой статье приведены примеры кода и объяснение основных понятий, необходимых для выполнения первых структурированных запросов потоковой передачи в Azure Databricks. Структурированная потоковая передача можно использовать для практически в реальном времени и добавочных рабочих нагрузок обработки.
Структурированная потоковая передача — это одна из нескольких технологий, которые питают таблицы потоковой передачи в разностных динамических таблицах. Databricks рекомендует использовать разностные динамические таблицы для всех новых рабочих нагрузок ETL, приема и структурированной потоковой передачи. См. раздел "Что такое разностные динамические таблицы?".
Примечание.
Хотя разностные динамические таблицы предоставляют немного измененный синтаксис для объявления таблиц потоковой передачи, общий синтаксис для настройки потоковых операций чтения и преобразований применяется ко всем вариантам использования потоковой передачи в Azure Databricks. Разностные динамические таблицы также упрощают потоковую передачу, управляя сведениями о состоянии, метаданными и многочисленными конфигурациями.
Использование автозагрузчика для чтения потоковых данных из хранилища объектов
В следующем примере демонстрируется загрузка данных JSON с помощью автозагрузчика, который используется cloudFiles
для обозначения формата и параметров. Этот schemaLocation
параметр включает вывод схемы и эволюцию. Вставьте следующий код в ячейку записной книжки Databricks и запустите ячейку для создания потокового кадра данных с именем raw_df
:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Как и другие операции чтения в Azure Databricks, настройка потокового чтения фактически не загружает данные. Перед началом потока необходимо активировать действие по данным.
Примечание.
Вызов display()
потокового кадра данных запускает задание потоковой передачи. В большинстве случаев использования структурированной потоковой передачи действие, которое активирует поток, должно записывать данные в приемник. Сведения о структурированной потоковой передаче см. в разделе "Рекомендации по рабочей среде".
Выполнение преобразования потоковой передачи
Структурированная потоковая передача поддерживает большинство преобразований, доступных в Azure Databricks и Spark SQL. Вы можете даже загрузить модели MLflow в качестве определяемых пользователем пользователей и сделать прогнозы потоковой передачи в качестве преобразования.
В следующем примере кода выполняется простое преобразование для обогащения приема данных JSON дополнительными сведениями с помощью функций Spark SQL:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
В результате transformed_df
содержатся инструкции по загрузке и преобразованию каждой записи по мере поступления в источник данных.
Примечание.
Структурированная потоковая передача обрабатывает источники данных как несвязанные или бесконечные наборы данных. Таким образом, некоторые преобразования не поддерживаются в структурированных рабочих нагрузках потоковой передачи, так как для их сортировки требуется бесконечное количество элементов.
Большинство агрегатов и многих соединений требуют управления сведениями о состоянии с подложками, окнами и режимом вывода. См. раздел "Применить подложки" для управления порогами обработки данных.
Выполнение добавочной записи пакета в Delta Lake
В следующем примере записывается в Delta Lake с помощью указанного пути к файлу и контрольной точки.
Внимание
Обязательно укажите уникальное расположение контрольной точки для каждого настраиваемого модуля потоковой передачи. Контрольная точка предоставляет уникальное удостоверение для потока, отслеживая все записи, обработанные и сведения о состоянии, связанные с вашим потоковым запросом.
Параметр availableNow
триггера указывает структурированной потоковой передаче обрабатывать все ранее необработанные записи из исходного набора данных, а затем завершить работу, чтобы можно было безопасно выполнить следующий код, не беспокоясь о выходе из потока:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
В этом примере новые записи не поступают в наш источник данных, поэтому повторная выполнение этого кода не получает новых записей.
Предупреждение
Структурированное выполнение потоковой передачи может предотвратить автоматическое завершение работы вычислительных ресурсов. Чтобы избежать непредвиденных затрат, обязательно завершите потоковые запросы.
Чтение данных из Delta Lake, преобразование и запись в Delta Lake
Delta Lake имеет обширную поддержку работы со структурированной потоковой передачей как источником, так и приемником. См. потоковую передачу потоковой передачи и записи в разностной таблице.
В следующем примере показан пример синтаксиса для добавочной загрузки всех новых записей из таблицы Delta, объединения их с моментальным снимком другой таблицы Delta и их записи в разностную таблицу:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
У вас должны быть соответствующие разрешения, настроенные для чтения исходных таблиц и записи в целевые таблицы и указанное расположение контрольной точки. Заполните все параметры, обозначаемые угловыми скобками (<>
) с помощью соответствующих значений для источников данных и приемников.
Примечание.
Delta Live Table предоставляет полностью декларативный синтаксис для создания конвейеров Delta Lake и управляет свойствами, такими как триггеры и контрольные точки автоматически. См. раздел "Что такое разностные динамические таблицы?".
Чтение данных из Kafka, преобразование и запись в Kafka
Apache Kafka и другие автобусы обмена сообщениями предоставляют некоторые из самых низких задержек, доступных для больших наборов данных. Azure Databricks можно использовать для применения преобразований к данным, полученным из Kafka, а затем записывать данные обратно в Kafka.
Примечание.
Запись данных в облачное хранилище объектов добавляет дополнительные затраты на задержку. Если вы хотите хранить данные из шины обмена сообщениями в Delta Lake, но требуется наименьшая задержка для потоковых рабочих нагрузок, Databricks рекомендует настроить отдельные задания потоковой передачи для приема данных в lakehouse и применить почти в режиме реального времени преобразования для приемников нисходящей шины обмена сообщениями.
В следующем примере кода показан простой шаблон для обогащения данных из Kafka путем объединения данных в разностную таблицу, а затем обратно в Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
У вас должны быть соответствующие разрешения, настроенные для доступа к службе Kafka. Заполните все параметры, обозначаемые угловыми скобками (<>
) с помощью соответствующих значений для источников данных и приемников. См. сведения о потоковой обработке с помощью Apache Kafka и Azure Databricks.