Загрузка и пошаговая обработка данных с помощью потоков декларативных конвейеров Lakeflow Spark

Данные обрабатываются в конвейерах через потоки. Каждый поток состоит из запроса и, как правило, целевого объекта. Поток обрабатывает запрос либо как пакет, либо поэтапно в виде потока данных в целевой объект. Поток живет в конвейере в декларативных конвейерах Lakeflow Spark.

Как правило, потоки определяются автоматически при создании запроса в конвейере, который обновляет целевой объект, но также можно явно определить дополнительные потоки для более сложной обработки, например добавление к одному целевому объекту из нескольких источников.

Обновления

Поток выполняется каждый раз, когда обновляется его определяющий конвейер. Поток создаст или обновит таблицы с последними доступными данными. В зависимости от типа потока и состояния изменений данных обновление может выполнять добавочное обновление, которое обрабатывает только новые записи или выполняет полное обновление, которое повторно обрабатывает все записи из источника данных.

Создание потока по умолчанию

При создании конвейера обычно определяется таблица или представление вместе с запросом, поддерживающим его. Например, в этом SQL-запросе создается потоковая таблица, называемая customers_silver, путем чтения из таблицы customers_bronze.

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Вы также можете создать ту же таблицу потоковой передачи в Python. В Python вы используете конвейеры, создавая функцию запроса, которая возвращает DataFrame, с декораторами для добавления функциональности Декларативных конвейеров Lakeflow Spark.

from pyspark import pipelines as dp

@dp.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

В этом примере вы создали таблицу потоковой передачи. Вы также можете создавать материализованные представления с аналогичным синтаксисом как в SQL, так и в Python. Дополнительные сведения см. в таблицах потоковой обработки и материализованных представлениях.

В этом примере создается поток по умолчанию вместе с потоковой таблицей. Поток по умолчанию для потоковой таблицы — это поток добавления, который добавляет новые строки с каждым триггером. Это наиболее распространенный способ использования конвейеров: создание потока и целевого объекта на одном шаге. Этот стиль можно использовать для приема данных или преобразования данных.

Потоки добавления также поддерживают обработку, требующую чтения данных из нескольких источников потоковой передачи для обновления одного целевого объекта. Например, вы можете использовать функциональность добавления потока, когда у вас есть существующая потоковая таблица и поток, и вы хотите добавить новый источник данных, который записывает в эту существующую потоковую таблицу.

Использование нескольких потоков для записи в один целевой объект

В предыдущем примере вы создали поток данных и потоковую таблицу за один шаг. Вы также можете создавать потоки для ранее созданной таблицы. В этом примере можно увидеть создание таблицы и потока, связанного с ним, в отдельных шагах. Этот код имеет идентичные результаты, как создание потока по умолчанию, включая использование того же имени для потоковой таблицы и потока.

Питон

from pyspark import pipelines as dp

# create streaming table
dp.create_streaming_table("customers_silver")

# add a flow
@dp.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

Создание потока независимо от целевого объекта означает, что можно также создать несколько потоков, которые добавляют данные к одному целевому объекту.

Используйте декоратор @dp.append_flow в интерфейсе Python или условие CREATE FLOW...INSERT INTO в интерфейсе SQL, чтобы создать новый поток данных, например, для формирования целевой таблицы потоковой передачи из нескольких потоковых источников. Используйте поток добавления для задач обработки, таких как:

  • Добавьте источники потоковой передачи, добавляющие данные в существующую потоковую таблицу без полного обновления. Например, у вас может быть таблица, объединяющая региональные данные из каждого региона, в который вы работаете. При развертывании новых регионов можно добавить новые данные региона в таблицу без полного обновления. Пример добавления источников потоковой передачи в существующую таблицу потоковой передачи см. в примере: запись в таблицу потоковой передачи из нескольких разделов Kafka.
  • Добавьте отсутствующие исторические данные (обратное заполнение) в стриминговую таблицу. Вы можете использовать синтаксис INSERT INTO ONCE для создания дополнения обратной загрузки, которая выполняется единожды. Например, у вас есть существующая таблица потоковой передачи, записанная в раздел Apache Kafka. У вас также есть исторические данные, хранящиеся в таблице, которые надо вставить ровно один раз в таблицу потоковых данных, и вы не можете передавать данные, так как обработка включает в себя выполнение сложной агрегации перед вставкой данных. Пример обратного заполнения см. в разделе "Заполнение исторических данных с помощью каналов".
  • Объедините данные из нескольких источников и запишите в одну потоковую таблицу вместо использования оператора UNION в запросе. Использование обработки потока добавления вместо UNION позволяет постепенно обновлять целевую таблицу без выполнения полного обновления. Пример объединения, выполненный таким образом, см. в примере : используйте обработку потока добавления вместо UNION.

Целевой объект для выходных данных записей обработкой потока добавления может быть существующей таблицей или новой таблицей. Для запросов Python используйте функцию create_streaming_table() для создания целевой таблицы.

В следующем примере добавляются два потока для одного целевого объекта, создав объединение двух исходных таблиц:

Питон

from pyspark import pipelines as dp

# create a streaming table
dp.create_streaming_table("customers_us")

# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
  return spark.readStream.table("customers_us_west")

# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
  return spark.readStream.table("customers_us_east")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;

-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);

-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);

Это важно

  • Если необходимо определить ограничения качества данных с ожиданиями, определите ожидания в целевой таблице в рамках create_streaming_table() функции или существующего определения таблицы. Вы не можете определить ожидания в определении @append_flow .
  • Потоки определяются именем потока, и это имя используется для идентификации контрольных точек потоковой передачи. Использование имени потока для идентификации контрольной точки означает следующее:
    • Если существующий поток в конвейере переименован, контрольная точка не переносится, и переименованный поток фактически является совершенно новым потоком.
    • Невозможно повторно использовать имя потока в конвейере, так как существующая контрольная точка не будет соответствовать новому определению потока.

Типы потоков

Потоки по умолчанию для потоковых таблиц и материализованных представлений — это потоки добавления. Вы также можете создавать потоки для чтения данных из источников слежения за изменением данных. В следующей таблице описаны различные типы потоков.

Тип потока Description
Append Потоки добавления — это наиболее распространенный тип потока, где новые записи в источнике записываются в целевой объект с каждым обновлением. Они соответствуют режиму добавления в структурированной потоковой передаче. Вы можете добавить ONCE флаг, указывающий пакетный запрос, данные которого должны вставляться в целевой объект только один раз, если целевой объект не будет полностью обновлен. Любое количество потоков добавления может записываться в определенный целевой объект.
Потоки по умолчанию (созданные с целевой таблицей потоковой передачи или материализованным представлением) будут иметь то же имя, что и целевой объект. Другие целевые объекты не имеют потоков по умолчанию.
Auto CDC (ранее применить изменения) Поток Auto CDC загружает запрос, содержащий данные захвата изменений (CDC). Автоматические потоки CDC могут ориентироваться только на потоковые таблицы, и источник также должен быть потоковым (даже в случае потоков ONCE). Несколько автоматизированных потоков CDC могут быть нацелены на одну потоковую таблицу. Потоковая таблица, выступающая в качестве целевого объекта для автоматического потока CDC, может быть целью только для других автоматических потоков CDC.
Дополнительные сведения о данных CDC см. в api-интерфейсах AUTO CDC: упрощение отслеживания изменений с помощью конвейеров.

Дополнительные сведения

Дополнительные сведения о потоках и их использовании см. в следующих разделах: