Поделиться через


Постепенное загрузка и обработка данных с помощью потоков разностных динамических таблиц

В этой статье объясняется, какие потоки и как можно использовать потоки в конвейерах Delta Live Table для добавочного обработки данных из источника в целевую потоковую таблицу. В разностных динамических таблицах потоки определяются двумя способами:

  1. Поток определяется автоматически при создании запроса, обновляющего потоковую таблицу.
  2. Разностные динамические таблицы также предоставляют функции для явного определения потоков для более сложной обработки, например добавления в таблицу потоковой передачи из нескольких источников потоковой передачи.

В этой статье рассматриваются неявные потоки, созданные при определении запроса для обновления таблицы потоковой передачи, а затем содержатся сведения о синтаксисе для определения более сложных потоков.

Что такое поток?

В разностных динамических таблицах поток — это потоковый запрос, обрабатывающий исходные данные постепенно для обновления целевой потоковой таблицы. Большинство наборов данных Delta Live Tables, создаваемых в конвейере, определяют поток как часть запроса и не требуют явного определения потока. Например, вы создаете таблицу потоковой передачи в Delta Live Table в одной команде DDL вместо использования отдельных инструкций таблицы и потока для создания таблицы потоковой передачи:

Примечание.

Этот CREATE FLOW пример предоставляется только для иллюстрирующих целей и включает в себя ключевое слово, которые не являются допустимыми синтаксисом разностных динамических таблиц.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Помимо потока по умолчанию, определенного запросом, интерфейсы Python и SQL Delta Live Tables предоставляют функциональные возможности потока добавления. Поток добавления поддерживает обработку, требующую чтения данных из нескольких источников потоковой передачи для обновления одной потоковой таблицы. Например, можно использовать функции потока добавления, если у вас есть существующая потоковая таблица и поток и требуется добавить новый источник потоковой передачи, который записывается в эту существующую потоковую таблицу.

Использование потока добавления для записи в потоковую таблицу из нескольких исходных потоков

Примечание.

Чтобы использовать обработку потока добавления, конвейер должен быть настроен для использования канала предварительной версии.

@append_flow Используйте декоратор в интерфейсе Python или CREATE FLOW предложение в интерфейсе SQL для записи в таблицу потоковой передачи из нескольких источников потоковой передачи. Используйте поток добавления для задач обработки, таких как:

  • Добавьте источники потоковой передачи, добавляющие данные в существующую потоковую таблицу без полного обновления. Например, у вас может быть таблица, объединяющая региональные данные из каждого региона, в который вы работаете. При развертывании новых регионов можно добавить новые данные региона в таблицу без полного обновления. См . пример. Запись в таблицу потоковой передачи из нескольких разделов Kafka.
  • Обновите таблицу потоковой передачи, добавив отсутствующие исторические данные (обратная заполнение). Например, у вас есть существующая таблица потоковой передачи, записанная в раздел Apache Kafka. У вас также есть исторические данные, хранящиеся в таблице, которая требуется вставить ровно один раз в таблицу потоковой передачи, и вы не можете передавать данные, так как обработка включает в себя выполнение сложной агрегирования перед вставкой данных. См . пример: выполнение однократного резервного заполнения данных.
  • Объединение данных из нескольких источников и запись в одну потоковую таблицу вместо использования UNION предложения в запросе. Использование обработки потока добавления вместо UNION того, чтобы обновлять целевую таблицу постепенно без выполнения полного обновления. См . пример. Использование обработки потока добавления вместо UNION.

Целевой объект для выходных данных записей обработкой потока добавления может быть существующей таблицей или новой таблицей. Для запросов Python используйте функцию create_streaming_table() для создания целевой таблицы.

Внимание

  • Если необходимо определить ограничения качества данных с ожиданиями, определите ожидания в целевой таблице в рамках create_streaming_table() функции или существующего определения таблицы. Вы не можете определить ожидания в определении @append_flow .
  • Потоки определяются именем потока, и это имя используется для идентификации проверка точек потоковой передачи. Использование имени потока для идентификации точки проверка означает следующее:
    • Если существующий поток в конвейере переименован, точка проверка не переносится, а переименованный поток фактически является совершенно новым потоком.
    • Невозможно повторно использовать имя потока в конвейере, так как существующая проверка point не будет соответствовать новому определению потока.

Ниже приведен синтаксис для @append_flow:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

Пример. Запись в потоковую таблицу из нескольких разделов Kafka

В следующих примерах создается таблица потоковой передачи с именем kafka_target и запись в нее из двух разделов Kafka:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Дополнительные сведения о табличном значении функции, используемой в запросах SQL, см. в read_kafka() read_kafkaсправочнике по языку SQL.

Пример. Запуск одноразовой обратной заполнения данных

В следующих примерах выполняется запрос для добавления исторических данных в таблицу потоковой передачи:

Примечание.

Чтобы обеспечить правильное однократное заполнение, когда запрос обратной заполнения является частью конвейера, который выполняется на запланированной основе или непрерывно, удалите запрос после запуска конвейера один раз. Чтобы добавить новые данные, если он поступает в каталог обратной заполнения, оставьте запрос на месте.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  cloud_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  cloud_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Пример. Использование обработки потока добавления вместо UNION

Вместо использования запроса с UNION предложением можно использовать запросы потока добавления для объединения нескольких источников и записи в одну потоковую таблицу. Использование запросов потока добавления вместо UNION того, чтобы добавлять в таблицу потоковой передачи из нескольких источников без полного обновления.

В следующем примере Python содержится запрос, который объединяет несколько источников данных с предложением UNION :

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

В следующих примерах запрос заменится запросом UNION на запросы потока добавления:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/apac",
    "csv"
  );