Ескертпе
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Жүйеге кіруді немесе каталогтарды өзгертуді байқап көруге болады.
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Каталогтарды өзгертуді байқап көруге болады.
Это важно
API конвейера create_sink находится в общедоступной предварительной версии.
Функция create_sink() записывается в службу потоковой передачи событий, например Apache Kafka или Центры событий Azure, или в таблицу Delta из декларативного конвейера. После создания приемника с функцией create_sink(), вы используете приемник в потоке добавления для записи данных в приемник. Единственный тип потока, поддерживаемый функцией create_sink(), — это поток добавления. Другие типы потоков, например create_auto_cdc_flow, не поддерживаются.
Delta sink поддерживает внешние и управляемые таблицы в каталоге Unity и управляемые таблицы в мета-хранилище Hive. Имена таблиц должны быть полностью квалифицированы. Например, таблицы каталога Unity должны использовать трехуровневый идентификатор: <catalog>.<schema>.<table>. Таблицы хранилища метаданных Hive должны использовать <schema>.<table>.
Замечание
- Запуск полного обновления не очищает данные из хранилищ. Все повторно обработанные данные будут добавлены в приемник, и существующие данные не будут изменены.
- Ожидания с API
sinkне поддерживаются.
Синтаксис
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Параметры
| Параметр | Тип | Description |
|---|---|---|
name |
str |
Обязательное. Строка, идентифицирующая приемник и используемая для ссылки на него и управления им. Имена приемников должны быть уникальными для конвейера, включая все файлы исходного кода, которые являются частью конвейера. |
format |
str |
Обязательное. Строка, определяющая выходной формат либо kafka, либо delta. |
options |
dict |
Список параметров приемника, отформатированный как {"key": "value"}, где ключ и значение являются обеими строками. Поддерживаются все параметры среды выполнения Databricks, поддерживаемые приемниками Kafka и Delta.
|
Примеры
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)