Поделиться через


create_sink

Это важно

API конвейера create_sink находится в общедоступной предварительной версии.

Функция create_sink() записывается в службу потоковой передачи событий, например Apache Kafka или Центры событий Azure, или в таблицу Delta из декларативного конвейера. После создания приемника с функцией create_sink(), вы используете приемник в потоке добавления для записи данных в приемник. Единственный тип потока, поддерживаемый функцией create_sink(), — это поток добавления. Другие типы потоков, например create_auto_cdc_flow, не поддерживаются.

Delta sink поддерживает внешние и управляемые таблицы в каталоге Unity и управляемые таблицы в мета-хранилище Hive. Имена таблиц должны быть полностью квалифицированы. Например, таблицы каталога Unity должны использовать трехуровневый идентификатор: <catalog>.<schema>.<table>. Таблицы хранилища метаданных Hive должны использовать <schema>.<table>.

Замечание

  • Запуск полного обновления не очищает данные из хранилищ. Все повторно обработанные данные будут добавлены в приемник, и существующие данные не будут изменены.
  • Ожидания с API sink не поддерживаются.

Синтаксис

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

Параметры

Параметр Тип Description
name str Обязательное. Строка, идентифицирующая приемник и используемая для ссылки на него и управления им. Имена приемников должны быть уникальными для конвейера, включая все файлы исходного кода, которые являются частью конвейера.
format str Обязательное. Строка, определяющая выходной формат либо kafka, либо delta.
options dict Список параметров приемника, отформатированный как {"key": "value"}, где ключ и значение являются обеими строками. Поддерживаются все параметры среды выполнения Databricks, поддерживаемые приемниками Kafka и Delta.

Примеры

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)