Поделиться через


append_flow

Декоратор @dp.append_flow создает потоки добавления данных или обратное наполнение для таблиц конвейера. Функция должна возвращать кадр данных потоковой передачи Apache Spark. Дополнительные сведения о загрузке и обработке данных см. в потоках декларативных конвейеров Lakeflow Spark.

Потоки добавления могут быть направлены на таблицы потоков или приемники.

Синтаксис

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

Параметры

Параметр Тип Description
function function Обязательное. Функция, возвращающая кадр данных потоковой передачи Apache Spark из определяемого пользователем запроса.
target str Обязательное. Имя таблицы или приемника, которое является целевым объектом потока добавления.
name str Имя потока. Если этот параметр не указан, по умолчанию используется имя функции.
once bool При необходимости определите поток как одноразовый, например, обратная закачка. Использование once=True меняет поток двумя способами:
  • Возвращаемое значение. streaming-query. в данном случае это должен быть пакетный DataFrame, а не потоковый DataFrame.
  • Поток выполняется один раз по умолчанию. Если конвейер обновляется с полным обновлением, ONCE поток снова запускается для повторного создания данных.
comment str Описание потока.
spark_conf dict Список конфигураций Spark для выполнения этого запроса

Примеры

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))