這很重要
管線 create_sink API 目前處於 公開預覽狀態。
這個函式將資料寫入事件串流服務,如Apache Kafka、Azure 事件中樞,或寫入「Delta 資料表」,這是從宣告式管線進行的操作。 在使用create_sink() 函式建立接收器之後,您可以在附加流中使用該接收器將數據寫入其中。 append flow 是 create_sink() 函式唯一支援的流程類型。 不支援其他流程類型,例如 create_auto_cdc_flow。
Delta 資料流接收點支援 Unity Catalog 的外部和管理的資料表,以及 Hive 資料庫管理的資料表。 表格名稱必須完全限定。 例如,Unity 目錄資料表必須使用三層識別碼:<catalog>.<schema>.<table>。 Hive 中繼存放區資料表必須使用 <schema>.<table>。
備註
- 執行完整重新整理更新不會移除接收器中的數據。 任何重新處理的資料都會附加到匯集端,而不會改變現有的資料。
-
sinkAPI 不支援期望值。
語法
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
參數
| 參數 | 類型 | Description |
|---|---|---|
name |
str |
必須的。 用來識別接收端並用以參考和管理該接收端的字串。 接收端名稱必須在管線中具有唯一性,這包括所有屬於管線的原始程式碼檔案。 |
format |
str |
必須的。 定義輸出格式的字串,kafka 或 delta。 |
options |
dict |
格式為 {"key": "value"}的接收選項清單,其中索引鍵和值都是字串。 所有由 Kafka 和 Delta 匯聚接收器支援的 Databricks Runtime 選項均被支援。
|
範例
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)