Поделиться через


добавить_поток

Декоратор @dlt.append_flow создает потоки добавления или дозаполнения для таблиц декларативных конвейеров Lakeflow. Функция должна возвращать кадр данных потоковой передачи Apache Spark. См. Загрузка и обработка данных поэтапно с потоками декларативных конвейеров Lakeflow.

Потоки добавления могут быть направлены на таблицы потоковой обработки или приемники.

Синтаксис

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

Параметры

Параметр Тип Описание
функция function Обязательное. Функция, возвращающая кадр данных потоковой передачи Apache Spark из определяемого пользователем запроса.
target str Обязательное. Имя таблицы или приемника, которое является целевым объектом потока добавления.
name str Имя потока. Если этот параметр не указан, по умолчанию используется имя функции.
comment str Описание потока.
spark_conf dict Список конфигураций Spark для выполнения этого запроса

Примеры

import dlt

# Create a sink for an external Delta table
dlt.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dlt.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Create a Kafka sink
dlt.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dlt.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))