Получение потоковых данных в Lakehouse с помощью структурированной потоковой передачи Spark

Структурированная потоковая передача — это масштабируемый и отказоустойчивый обработчик обработки потоков, построенный на Spark. Spark выполняет операцию потоковой передачи постепенно и непрерывно по мере поступления данных.

Структурированная потоковая передача стала доступна в Spark 2.2. С тех пор рекомендуется использовать для потоковой передачи данных. Основной принцип структурированного потока заключается в том, чтобы рассматривать поток динамических данных как таблицу, в которой новые данные всегда постоянно добавляются, например новую строку в таблице. Существует несколько встроенных источников потоковых файлов, таких как CSV, JSON, ORC, Parquet и встроенная поддержка служб обмена сообщениями, таких как Kafka и Центры событий.

В этой статье содержатся сведения о том, как оптимизировать обработку и прием событий с помощью потоковой передачи структуры Spark в рабочих средах с высокой пропускной способностью. Ниже приведены предлагаемые подходы.

  • Оптимизация пропускной способности потоковой передачи данных
  • Оптимизация операций записи в разностной таблице и
  • Пакетная обработка событий

Определения заданий Spark и записные книжки Spark

Записные книжки Spark — это отличное средство проверки идей и экспериментов для получения аналитических сведений о данных или коде. Записные книжки широко используются в процессе подготовки данных, визуализации, машинного обучения и других сценариев больших данных. Определения заданий Spark — это неинтерактивные задачи, ориентированные на код, выполняемые в кластере Spark в течение длительного времени. Определения заданий Spark обеспечивают надежность и доступность.

Записные книжки Spark являются отличным источником для тестирования логики кода и решения всех бизнес-требований. Однако для обеспечения его работы в рабочем сценарии определения заданий Spark с включенной политикой повторных попыток являются лучшим решением.

Политика повторных попыток для определений заданий Spark

В Microsoft Fabric пользователь может задать политику повторных попыток для заданий определения заданий Spark. Хотя сценарий в задании может быть бесконечным, инфраструктура, на которой выполняется скрипт, может возникнуть проблема, требующая остановки задания. Или задание может быть устранено из-за потребностей в исправлении базовой инфраструктуры. Политика повторных попыток позволяет пользователю задавать правила автоматического перезапуска задания, если он останавливается из-за каких-либо базовых проблем. Параметры указывают частоту перезапуска задания до бесконечных повторных попыток и настройки времени между повторными попытками. Таким образом, пользователи могут гарантировать, что задания определения заданий Spark продолжают выполняться бесконечно, пока пользователь не решит остановить их.

Источники потоковой передачи

Для настройки потоковой передачи с помощью Центров событий требуется базовая конфигурация, которая включает имя пространства имен Центров событий, имя концентратора, имя общего ключа доступа и группу потребителей. Группа потребителей — это представление всего концентратора событий. Это позволяет нескольким потребляемыми приложениями иметь отдельное представление потока событий и читать поток независимо по своему темпу и с их смещениями.

Секции являются важной частью возможности обработки большого объема данных. Один процессор имеет ограниченную емкость для обработки событий в секунду, в то время как несколько процессоров могут выполнять лучшее задание при параллельном выполнении. Секции позволяют параллельно обрабатывать большие объемы событий.

Если слишком много секций используется с низкой скоростью приема, средства чтения секций работают с крошечной частью этих данных, что приводит к неоптимальной обработке. Идеальное количество секций напрямую зависит от требуемой скорости обработки. Если вы хотите масштабировать обработку событий, рассмотрите возможность добавления дополнительных разделов. В секции нет определенного ограничения пропускной способности. Однако агрегированная пропускная способность в пространстве имен ограничена количеством единиц пропускной способности. При увеличении количества единиц пропускной способности в пространстве имен может потребоваться дополнительное секции, чтобы одновременные читатели могли достичь максимальной пропускной способности.

Рекомендация заключается в изучении и тестировании оптимального количества секций для сценария пропускной способности. Но в сценариях с высокой пропускной способностью используется 32 или более секций.

Центры событий Azure Подключение or для Apache Spark (azure-event-hubs-spark) рекомендуется подключить приложение Spark к Центры событий Azure.

Lakehouse в качестве приемника потоковой передачи

Delta Lake — это слой хранения с открытым исходным кодом, который предоставляет транзакции ACID (атомарность, согласованность, изоляция и устойчивость) на основе решений хранилища озера данных. Delta Lake также поддерживает масштабируемую обработку метаданных, эволюцию схемы, перемещение по времени (управление версиями данных), открытый формат и другие функции.

В Инжиниринг данных Fabric Delta Lake используется для следующих способов:

  • Простой upsert (insert/update) и удаление данных с помощью Spark SQL.
  • Сжимайте данные, чтобы свести к минимуму время, затраченное на запрос данных.
  • Просмотр состояния таблиц до и после выполнения операций.
  • Получение журнала операций, выполняемых в таблицах.

Delta добавляется в качестве одного из возможных форматов приемников выходных данных, используемых в writeStream. Дополнительные сведения о существующих приемниках выходных данных см . в руководстве по программированию структурированной потоковой передачи Spark.

В следующем примере показано, как можно передавать данные в Delta Lake.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 

df = spark \ 
  .readStream \ 
  .format("eventhubs") \ 
  .options(**ehConf) \ 
  .load()  

Schema = StructType([StructField("<column_name_01>", StringType(), False), 
                     StructField("<column_name_02>", StringType(), False), 
                     StructField("<column_name_03>", DoubleType(), True), 
                     StructField("<column_name_04>", LongType(), True), 
                     StructField("<column_name_05>", LongType(), True)]) 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .toTable("deltaeventstable") 

Сведения о коде, фрагментованном в примере:

  • Format() — это инструкция, определяющая выходной формат данных.
  • outputMode() определяет, каким образом записываются новые строки потоковой передачи (т. е. добавление, перезапись).
  • toTable() сохраняет потоковые данные в таблицу Delta, созданную с помощью значения, переданного в качестве параметра.

Оптимизация операций записи Delta

Секционирование данных является важной частью создания надежного решения потоковой передачи: секционирование улучшает способ упорядочения данных, а также повышает пропускную способность. Файлы легко фрагментируются после операций Delta, что приводит к слишком большому объему небольших файлов. И слишком большие файлы также являются проблемой из-за длительного времени записи их на диске. Проблема с секционированием данных обеспечивает правильный баланс, который приводит к оптимальному размеру файлов. Spark поддерживает секционирование в памяти и на диске. Правильно секционированные данные могут обеспечить лучшую производительность при сохранении данных в Delta Lake и запросах данных из Delta Lake.

  • При секционирования данных на диске можно выбрать способ секционирования данных на основе столбцов с помощью partitionBy(). partitionBy() — это функция, используемая для секционирования большой семантической модели на небольшие файлы на основе одного или нескольких столбцов, предоставленных при записи на диск. Секционирование — это способ повышения производительности запросов при работе с большой семантической моделью. Избегайте выбора столбца, который создает слишком малые или слишком большие секции. Определите секцию на основе набора столбцов с хорошей карта inality и разделите данные на файлы оптимального размера.
  • Секционирование данных в памяти можно сделать с помощью преобразований repartition() или coalesce(), распределения данных на нескольких рабочих узлах и создания нескольких задач, которые могут параллельно считывать и обрабатывать данные параллельно с помощью основы отказоустойчивого распределенного набора данных (RDD). Она позволяет разделить семантику модели на логические секции, которые можно вычислить на разных узлах кластера.
    • Repartition() используется для увеличения или уменьшения количества секций в памяти. Перераспрешиление переопределяет все данные по сети и балансирует их между всеми секциями.
    • объединение () используется только для эффективного уменьшения количества секций. Это оптимизированная версия repartition(), где перемещение данных по всем секциям ниже с помощью объединения().

Объединение обоих подходов секционирования — это хорошее решение в сценарии с высокой пропускной способностью. Repartition() создает определенное количество секций в памяти, а partitionBy() записывает файлы на диск для каждого раздела памяти и столбца секционирования. В следующем примере показано использование обоих стратегий секционирования в одном задании Spark: данные сначала разделены на 48 секций в памяти (если у нас всего 48 ядер ЦП), а затем секционированы на диске на основе двух существующих столбцов полезных данных.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 
import json 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Оптимизированная запись

Другой вариант оптимизации операций записи в Delta Lake — использование оптимизированной записи. Оптимизированная запись — это необязательная функция, которая улучшает способ записи данных в таблицу Delta. Spark объединяет или разбивает секции перед записью данных, повышая пропускную способность записываемых на диск данных. Однако он приводит к полному перетасовке, поэтому для некоторых рабочих нагрузок это может привести к снижению производительности. Задания, использующие объединение() и /или repartition() для секционирования данных на диске, можно рефакторинговать, чтобы начать использовать оптимизированную запись.

Следующий код является примером использования оптимизированной записи. Обратите внимание, что partitionBy() по-прежнему используется.

spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", true) 
 
rawData = df \ 
 .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

События пакетной обработки

Чтобы свести к минимуму количество операций для улучшения времени приема данных в Delta Lake, пакетные события — это практическая альтернатива.

Триггеры определяют частоту выполнения потокового запроса (триггера) и выдачу новых данных, при настройке их определяет интервал времени периодической обработки для микробатов, накапливая данные и пакетные события в несколько сохраняющихся операций, а не записывая на диск все время.

В следующем примере показан потоковый запрос, в котором события периодически обрабатываются интервалами в одну минуту.

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .trigger(processingTime="1 minute") \ 
  .toTable("deltaeventstable") 

Преимущество объединения пакетной обработки событий в операциях записи таблиц Delta заключается в том, что он создает большие разностные файлы с большим количеством данных в них, избегая небольших файлов. Необходимо проанализировать объем данных, которые принимаются, и найти оптимальное время обработки, чтобы оптимизировать размер файлов Parquet, созданных библиотекой Delta.

Наблюдение

В Spark 3.1 и более поздних версиях есть встроенный структурированный пользовательский интерфейс потоковой передачи, содержащий следующие метрики потоковой передачи:

  • Частота входных данных
  • Скорость обработки
  • Входные строки
  • Batch Duration
  • Длительность операции