Использование Delta Lake с данными потоковой передачи

Завершено

Все данные, которые мы изучили до этой точки, были статическими данными в файлах. Однако многие сценарии аналитики данных включают потоковую передачу данных, которые нужно обрабатывать практически в режиме реального времени. Например, может потребоваться записывать показания устройств Интернета вещей и сохранять их в таблице по мере возникновения.

Структурированная потоковая передача Spark

Обычное решение потоковой обработки включает в себя постоянное чтение потока данных из источника, необязательную обработку для выбора определенных полей, статистических значений и значений групп, либо обработку их другим способом, и запись результатов в приемник.

Spark включает встроенную поддержку потоковой передачи данных посредством API Spark Structured Streaming, основанного на безграничном кадре данных, в котором данные потоковой передачи записываются для обработки. Кадр данных Spark Structured Streaming может считывать данные из различных типов источников потоковой передачи, включая сетевые порты, службы брокера сообщений в режиме реального времени, такие как Центры событий Azure или Kafka, или расположения файловой системы.

Совет

Дополнительные сведения см. в руководстве по программированию Spark Structured Streaming в документации Spark.

Потоковая передача с таблицами Delta Lake

Таблицу Delta Lake можно использовать в качестве источника или приемника для структурированной потоковой передачи Spark. Например, можно записать поток данных в режиме реального времени с устройства Интернета вещей, сохраняя его непосредственно в таблицу Delta Lake в качестве приемника. Это позволяет запрашивать таблицу для просмотра последних потоковых данных. Кроме того, можно прочитать разностную таблицу в качестве источника потоковой передачи, что позволяет постоянно сообщать о новых данных по мере их добавления в таблицу.

Использование таблицы Delta Lake в качестве источника потоковой передачи

В следующем примере PySpark для хранения сведений о заказах в Интернете используется таблица Delta Lake. Создается поток, который считывает данные из папки таблицы Delta Lake по мере добавления новых данных.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .load("/delta/internetorders")

# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

Примечание.

При использовании таблицы Delta Lake в качестве источника потоковой передачи в поток можно включить только операции добавления. Изменение данных приведет к ошибке, если не указать параметр ignoreChanges или ignoreDeletes.

После считывания данных из таблицы Delta Lake в кадр данных потоковой передачи можно использовать Spark Structured Streaming API для его обработки. В приведенном выше примере кадр данных просто отображается, но вы можете использовать Spark Structured Streaming для агрегирования данных по темпоральным окнам (например, для подсчета количества заказов, размещенных каждую минуту) и отправки агрегированных результатов в нижестоящий процесс для визуализации почти в реальном времени.

Использование таблицы Delta Lake в качестве приемника потоковой передачи

В следующем примере PySpark поток данных считывается из JSON-файлов в папке. Данные JSON в каждом файле содержат состояние устройства Интернета вещей в формате {"device":"Dev1","status":"ok"}. Новые данные добавляются в поток при каждом добавлении файла в папку. Входной поток — это безграничный кадр данных, который затем записывается в разностный формат в расположение папки для таблицы Delta Lake.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = '/streamingdata/'
jsonSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)

Примечание.

Параметр checkpointLocation используется для записи файла контрольных точек, который отслеживает состояние обработки потока. Этот файл позволяет восстановиться после сбоя в точке, в которой потоковая обработка прекратилась.

После запуска процесса потоковой передачи можно запросить таблицу Delta Lake, в которую записываются выходные данные потоковой передачи, чтобы просмотреть последние данные. Например, следующий код создает таблицу каталога для папки таблицы Delta Lake и запрашивает ее:

%%sql

CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';

SELECT device, status
FROM DeviceTable;

Чтобы остановить запись потока данных в таблицу Delta Lake, можно использовать метод stop потокового запроса:

delta_stream.stop()

Совет

Дополнительные сведения об использовании таблиц Delta Lake для потоковой передачи данных см. в статье Потоковое чтение данных из таблицы и запись в нее в документации по Delta Lake.