Поделиться через


Запуск первой структурированной рабочей нагрузки потоковой передачи

В этой статье приведены примеры кода и объяснение основных понятий, необходимых для выполнения первых структурированных запросов потоковой передачи в Azure Databricks. Структурированная потоковая передача можно использовать для практически в реальном времени и добавочных рабочих нагрузок обработки.

Структурированная потоковая передача — это одна из нескольких технологий, которые питают таблицы потоковой передачи в разностных динамических таблицах. Databricks рекомендует использовать разностные динамические таблицы для всех новых рабочих нагрузок ETL, приема и структурированной потоковой передачи. См. раздел "Что такое разностные динамические таблицы?".

Примечание.

Хотя разностные динамические таблицы предоставляют немного измененный синтаксис для объявления таблиц потоковой передачи, общий синтаксис для настройки потоковых операций чтения и преобразований применяется ко всем вариантам использования потоковой передачи в Azure Databricks. Разностные динамические таблицы также упрощают потоковую передачу, управляя сведениями о состоянии, метаданными и многочисленными конфигурациями.

Чтение из потока данных

Структурированная потоковая передача можно использовать для добавочного приема данных из поддерживаемых источников данных. Ниже приведены некоторые из наиболее распространенных источников данных, используемых в рабочих нагрузках структурированной потоковой передачи Azure Databricks:

  • Файлы данных в облачном хранилище объектов
  • Автобусы сообщений и очереди
  • Delta Lake

Databricks рекомендует использовать автозагрузчик для приема потоковой передачи из облачного хранилища объектов. Автозагрузчик поддерживает большинство форматов файлов, поддерживаемых структурированной потоковой передачей. См. статью об автозагрузчике.

Каждый источник данных предоставляет ряд параметров для указания способа загрузки пакетов данных. Во время настройки чтения основные параметры, которые могут потребоваться задать, относятся к следующим категориям:

  • Параметры, определяющие источник данных или формат (например, тип файла, разделители и схему).
  • Параметры, которые настраивают доступ к исходным системам (например, параметры порта и учетные данные).
  • Параметры, определяющие, где начать поток (например, смещение Kafka или чтение всех существующих файлов).
  • Параметры, управляющие объемом обработки данных в каждом пакете (например, максимальное смещение, файлы или байты на пакет).

Использование автозагрузчика для чтения потоковых данных из хранилища объектов

В следующем примере демонстрируется загрузка данных JSON с помощью автозагрузчика, который используется cloudFiles для обозначения формата и параметров. Этот schemaLocation параметр включает вывод схемы и эволюцию. Вставьте следующий код в ячейку записной книжки Databricks и запустите ячейку для создания потокового кадра данных с именем raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Как и другие операции чтения в Azure Databricks, настройка потокового чтения фактически не загружает данные. Перед началом потока необходимо активировать действие по данным.

Примечание.

Вызов display() потокового кадра данных запускает задание потоковой передачи. В большинстве случаев использования структурированной потоковой передачи действие, которое активирует поток, должно записывать данные в приемник. Сведения о подготовке структурированного кода потоковой передачи для рабочей среды.

Выполнение преобразования потоковой передачи

Структурированная потоковая передача поддерживает большинство преобразований, доступных в Azure Databricks и Spark SQL. Вы можете даже загрузить модели MLflow в качестве определяемых пользователем пользователей и сделать прогнозы потоковой передачи в качестве преобразования.

В следующем примере кода выполняется простое преобразование для обогащения приема данных JSON дополнительными сведениями с помощью функций Spark SQL:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

В результате transformed_df содержатся инструкции по загрузке и преобразованию каждой записи по мере поступления в источник данных.

Примечание.

Структурированная потоковая передача обрабатывает источники данных как несвязанные или бесконечные наборы данных. Таким образом, некоторые преобразования не поддерживаются в структурированных рабочих нагрузках потоковой передачи, так как для их сортировки требуется бесконечное количество элементов.

Большинство агрегатов и многих соединений требуют управления сведениями о состоянии с подложками, окнами и режимом вывода. См. раздел "Применить подложки" для управления порогами обработки данных.

Запись в приемник данных

Приемник данных — это цель операции потоковой записи. К общим приемникам, используемым в рабочих нагрузках потоковой передачи Azure Databricks, относятся следующие:

  • Delta Lake
  • Автобусы сообщений и очереди
  • Базы данных "Ключ-значение"

Как и в случае с источниками данных, большинство приемников данных предоставляют ряд параметров для управления записью данных в целевую систему. Во время настройки записи основные параметры могут потребоваться задать в следующих категориях:

  • Режим вывода (добавляется по умолчанию).
  • Расположение проверка точки (требуется для каждого модуля записи).
  • Интервалы триггеров; См. раздел "Настройка интервалов триггера структурированной потоковой передачи".
  • Параметры, указывающие приемник данных или формат (например, тип файла, разделители и схема).
  • Параметры, которые настраивают доступ к целевым системам (например, параметры порта и учетные данные).

Выполнение добавочной записи пакета в Delta Lake

В следующем примере записывается в Delta Lake с помощью указанного пути к файлу и проверка point.

Важно!

Всегда убедитесь, что вы указываете уникальное расположение проверка point для каждого настраиваемого модуля записи потоковой передачи. Точка проверка предоставляет уникальное удостоверение для потока, отслеживая все записи, обработанные и сведения о состоянии, связанные с запросом потоковой передачи.

Параметр availableNow триггера указывает структурированной потоковой передаче обрабатывать все ранее необработанные записи из исходного набора данных, а затем завершить работу, чтобы можно было безопасно выполнить следующий код, не беспокоясь о выходе из потока:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

В этом примере новые записи не поступают в наш источник данных, поэтому повторная выполнение этого кода не получает новых записей.

Предупреждение

Структурированное выполнение потоковой передачи может предотвратить автоматическое завершение работы вычислительных ресурсов. Чтобы избежать непредвиденных затрат, обязательно завершите потоковые запросы.

Подготовка структурированного кода потоковой передачи для рабочей среды

Databricks рекомендует использовать разностные динамические таблицы для большинства структурированных рабочих нагрузок потоковой передачи. Следующие рекомендации предоставляют отправную точку подготовки структурированных рабочих нагрузок потоковой передачи для рабочей среды:

  • Удалите ненужный код из записных книжек, которые возвращают результаты, такие как display и count.
  • Не запускайте структурированные рабочие нагрузки потоковой передачи в интерактивных кластерах; всегда планировать потоки в качестве заданий.
  • Чтобы обеспечить автоматическое восстановление заданий потоковой передачи, настройте задания с бесконечными повторными попытками.
  • Не используйте автоматическое масштабирование для рабочих нагрузок со структурированной потоковой передачей.

Дополнительные рекомендации см . в разделе "Рекомендации по рабочей среде" для структурированной потоковой передачи.

Чтение данных из Delta Lake, преобразование и запись в Delta Lake

Delta Lake имеет обширную поддержку работы со структурированной потоковой передачей как источником, так и приемником. См. потоковую передачу потоковой передачи и записи в разностной таблице.

В следующем примере показан пример синтаксиса для добавочной загрузки всех новых записей из таблицы Delta, объединения их с моментальным снимком другой таблицы Delta и их записи в разностную таблицу:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Необходимо иметь соответствующие разрешения, настроенные для чтения исходных таблиц и записи в целевые таблицы и указанного расположения проверка point. Заполните все параметры, обозначаемые угловыми скобками (<>) с помощью соответствующих значений для источников данных и приемников.

Примечание.

Delta Live Table предоставляет полностью декларативный синтаксис для создания конвейеров Delta Lake и управляет свойствами, такими как триггеры и проверка точки автоматически. См. раздел "Что такое разностные динамические таблицы?".

Чтение данных из Kafka, преобразование и запись в Kafka

Apache Kafka и другие автобусы обмена сообщениями предоставляют некоторые из самых низких задержек, доступных для больших наборов данных. Azure Databricks можно использовать для применения преобразований к данным, полученным из Kafka, а затем записывать данные обратно в Kafka.

Примечание.

Запись данных в облачное хранилище объектов добавляет дополнительные затраты на задержку. Если вы хотите хранить данные из шины обмена сообщениями в Delta Lake, но требуется наименьшая задержка для потоковых рабочих нагрузок, Databricks рекомендует настроить отдельные задания потоковой передачи для приема данных в lakehouse и применить почти в режиме реального времени преобразования для приемников нисходящей шины обмена сообщениями.

В следующем примере кода показан простой шаблон для обогащения данных из Kafka путем объединения данных в разностную таблицу, а затем обратно в Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

У вас должны быть соответствующие разрешения, настроенные для доступа к службе Kafka. Заполните все параметры, обозначаемые угловыми скобками (<>) с помощью соответствующих значений для источников данных и приемников. См. сведения о потоковой обработке с помощью Apache Kafka и Azure Databricks.