Получение данных потоковой передачи в Lakehouse с помощью структурированной потоковой передачи Spark

Структурированная потоковая передача — это масштабируемая и отказоустойчивая подсистема обработки потоков, созданная на основе Spark. Spark выполняет операцию потоковой передачи постепенно и непрерывно по мере поступления данных.

Структурированная потоковая передача стала доступна в Spark 2.2. С тех пор это рекомендуемый подход для потоковой передачи данных. Основной принцип структурированного потока заключается в том, чтобы рассматривать поток динамических данных как таблицу, в которой новые данные всегда добавляются непрерывно, как новая строка в таблице. Существует несколько встроенных источников потоковых файлов, таких как CSV, JSON, ORC, Parquet и встроенная поддержка служб обмена сообщениями, таких как Kafka и Центры событий.

Важно!

Microsoft Fabric находится в предварительной версии.

В этой статье содержатся сведения об оптимизации обработки и приема событий с помощью потоковой передачи структуры Spark в рабочих средах с высокой пропускной способностью. Предлагаемые подходы включают:

  • Оптимизация пропускной способности потоковой передачи данных
  • Оптимизация операций записи в разностной таблице и
  • Пакетная обработка событий

Определения заданий Spark и записные книжки Spark

Записные книжки Spark — это отличное средство для проверки идей и проведения экспериментов для получения аналитических сведений из данных или кода. Записные книжки широко используются в подготовке, визуализации, машинном обучении и других сценариях обработки больших данных. Определения заданий Spark — это неинтерактивные задачи, ориентированные на код, выполняемые в кластере Spark в течение длительного времени. Определения заданий Spark обеспечивают надежность и доступность.

Записные книжки Spark отлично подходят для тестирования логики кода и удовлетворения всех бизнес-требований. Однако для поддержания рабочего сценария определения заданий Spark с включенной политикой повторных попыток являются лучшим решением.

Политика повторных попыток для определений заданий Spark

В Microsoft Fabric пользователь может задать политику повтора для заданий определения заданий Spark. Хотя скрипт в задании может быть бесконечным, в инфраструктуре, выполняющей скрипт, может возникнуть проблема, требующая остановки задания. Или задание может быть устранено из-за потребностей в исправлении базовой инфраструктуры. Политика повторных попыток позволяет пользователю задавать правила автоматического перезапуска задания, если оно останавливается из-за каких-либо базовых проблем. Параметры определяют частоту перезапуска задания( вплоть до бесконечных повторных попыток) и задают время между повторными попытками. Таким образом, пользователи могут гарантировать, что их задания определения заданий Spark продолжают выполняться бесконечно, пока пользователь не решит остановить их.

Источники потоковой передачи

Для настройки потоковой передачи с помощью Центров событий требуется базовая конфигурация, которая включает имя пространства имен Центров событий, имя концентратора, имя ключа общего доступа и группу потребителей. Группа потребителей — это представление всего концентратора событий. Это позволяет нескольким потребляющим приложениям иметь отдельное представление потока событий и считывать поток независимо в своем темпе и со смещениями.

Секции являются важной частью возможности обработки большого объема данных. Один процессор имеет ограниченную емкость для обработки событий в секунду, в то время как несколько процессоров могут выполнять более эффективную работу при параллельном выполнении. Секции позволяют параллельно обрабатывать большие объемы событий.

Если используется слишком много секций с низкой скоростью приема, средства чтения секций обрабатывают небольшую часть этих данных, что приводит к неоптимальной обработке. Идеальное количество секций напрямую зависит от требуемой скорости обработки. Если вы хотите масштабировать обработку событий, рассмотрите возможность добавления дополнительных секций. Для секции нет определенного ограничения пропускной способности. Однако совокупная пропускная способность в пространстве имен ограничена количеством единиц пропускной способности. По мере увеличения числа единиц пропускной способности в пространстве имен может потребоваться, чтобы дополнительные секции позволили параллельным модулям чтения достичь максимальной пропускной способности.

Рекомендуется изучить и проверить наилучшее количество секций для сценария пропускной способности. Но часто встречаются сценарии с высокой пропускной способностью с использованием 32 или более секций.

Центры событий Azure Соединитель для Apache Spark (ссылка) рекомендуется для подключения приложения Spark к Центры событий Azure.

Lakehouse в качестве приемника потоковой передачи

Delta Lake — это уровень хранения с открытым кодом, который предоставляет транзакции ACID (атомарность, согласованность, изоляция и устойчивость) поверх решений data lake storage. Delta Lake также поддерживает масштабируемую обработку метаданных, эволюцию схемы, перемещение по времени (управление версиями данных), открытый формат и другие функции.

В Инжиниринг данных Fabric Delta Lake используется для:

  • Простое обновление (вставка и обновление) и удаление данных с помощью Spark SQL.
  • Сжимайте данные, чтобы свести к минимуму время, затрачиваемое на запрос данных.
  • Просмотр состояния таблиц до и после выполнения операций.
  • Получение журнала операций, выполняемых с таблицами.

Delta добавляется в качестве одного из возможных форматов приемников выходных данных, используемых в writeStream. Дополнительные сведения о существующих приемниках вывода можно найти здесь.

В следующем примере показано, как можно выполнять потоковую передачу данных в Delta Lake.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 

df = spark \ 
  .readStream \ 
  .format("eventhubs") \ 
  .options(**ehConf) \ 
  .load()  

Schema = StructType([StructField("<column_name_01>", StringType(), False), 
                     StructField("<column_name_02>", StringType(), False), 
                     StructField("<column_name_03>", DoubleType(), True), 
                     StructField("<column_name_04>", LongType(), True), 
                     StructField("<column_name_05>", LongType(), True)]) 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", " Files/checkpoint") \ 
  .outputMode("append") \ 
  .toTable("deltaeventstable") 

Сведения о фрагменте кода в примере:

  • format() — это инструкция, определяющая выходной формат данных.
  • outputMode() определяет, каким образом записываются новые строки в потоковой передаче (то есть добавление, перезапись).
  • toTable() сохраняет потоковые данные в разностную таблицу, созданную с использованием значения, переданного в качестве параметра.

Оптимизация операций записи Delta

Секционирование данных является важной частью создания надежного решения потоковой передачи: секционирование улучшает способ организации данных, а также повышает пропускную способность. Файлы легко фрагментируются после разностных операций, что приводит к слишком большому объему небольших файлов. Слишком большие файлы также являются проблемой из-за длительного времени их записи на диск. Проблема с секционированием данных заключается в том, чтобы найти правильный баланс, который приводит к оптимальным размерам файлов. Spark поддерживает секционирование в памяти и на диске. Правильно секционированные данные могут обеспечить наилучшую производительность при сохранении данных в Delta Lake и запросе данных из Delta Lake.

  • При секционирование данных на диске можно выбрать способ секционирования данных на основе столбцов с помощью partitionBy(). partitionBy() — это функция, используемая для секционирования большого набора данных на файлы меньшего размера на основе одного или нескольких столбцов, предоставленных при записи на диск. Секционирование — это способ повышения производительности запросов при работе с большим набором данных. Избегайте выбора столбца, который создает слишком маленькие или слишком большие секции. Определите секцию на основе набора столбцов с хорошей кратностью и разделите данные на файлы оптимального размера.
  • Секционирование данных в памяти можно выполнить с помощью преобразований repartition() или coalesce(), распределяя данные на нескольких рабочих узлах и создавая несколько задач, которые могут считывать и обрабатывать данные параллельно, используя основы устойчивого распределенного набора данных (RDD). Он позволяет разделить набор данных на логические секции, которые можно вычислить на разных узлах кластера.
    • Repartition() используется для увеличения или уменьшения количества секций в памяти. Повторное секционирование перетасовывает все данные по сети и распределяет их по всем секциям.
    • функция coalesce() используется только для эффективного уменьшения числа секций. Это оптимизированная версия repartition(), где перемещение данных по всем секциям ниже с помощью coalesce().

Сочетание обоих подходов к секционированиям является хорошим решением в сценарии с высокой пропускной способностью. repartition() создает определенное количество секций в памяти, а partitionBy() записывает файлы на диск для каждого раздела памяти и столбца секционирования. В следующем примере показано использование обеих стратегий секционирования в одном задании Spark: данные сначала разбиваются на 48 секций в памяти (при условии, что у нас всего 48 ядер ЦП), а затем секционируются на диске в двух существующих столбцах полезных данных.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 
import json 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", " Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Оптимизированная запись

Другой вариант оптимизации операций записи в Delta Lake — использование оптимизированной записи. Оптимизированная запись — это необязательная функция, которая улучшает способ записи данных в разностную таблицу. Spark объединяет или разделяет секции перед записью данных, максимизируя пропускную способность данных, записываемых на диск. Однако это приводит к полному перетасовке, поэтому для некоторых рабочих нагрузок это может привести к снижению производительности. Задания, использующие coalesce() и (или ) repartition() для секционирования данных на диске, можно выполнить рефакторинг, чтобы начать использовать оптимизированную запись.

Следующий код является примером использования оптимизированной записи. Обратите внимание, что partitionBy() по-прежнему используется.

spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", true) 
 
rawData = df \ 
 .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", " Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

События пакетной обработки

Чтобы свести к минимуму количество операций для сокращения времени, затрачиваемого на прием данных в Delta Lake, можно использовать события пакетной обработки.

Триггеры определяют частоту выполнения (активации) запроса потоковой передачи и выдачи новых данных. Их настройка определяет интервал времени периодической обработки для микропакетов, накопление данных и пакетирование событий в несколько сохраняемых операций, а не запись на диск постоянно.

В следующем примере показан запрос потоковой передачи, в котором события периодически обрабатываются с интервалом в одну минуту.

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", " Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .trigger(processingTime="1 minute") \ 
  .toTable("deltaeventstable") 

Преимущество объединения пакетной обработки событий в операциях записи разностных таблиц заключается в том, что она создает большие разностные файлы с большим количеством данных в них, избегая небольших файлов. Необходимо проанализировать объем принятых данных и найти наилучшее время обработки, чтобы оптимизировать размер файлов Parquet, созданных библиотекой Delta.

Мониторинг

Spark 3.1 и более поздних версий имеют встроенный структурированный пользовательский интерфейс потоковой передачи (ссылка), содержащий следующие метрики потоковой передачи:

  • Скорость ввода
  • Скорость обработки
  • Входные строки
  • Batch Duration
  • Длительность операции

Дальнейшие действия