共用方式為


create_sink

這很重要

管線 create_sink API 目前處於 公開預覽狀態

這個函式將資料寫入事件串流服務,如Apache Kafka、Azure 事件中樞,或寫入「Delta 資料表」,這是從宣告式管線進行的操作。 在使用create_sink() 函式建立接收器之後,您可以在附加流中使用該接收器將數據寫入其中。 append flow 是 create_sink() 函式唯一支援的流程類型。 不支援其他流程類型,例如 create_auto_cdc_flow

Delta 資料流接收點支援 Unity Catalog 的外部和管理的資料表,以及 Hive 資料庫管理的資料表。 表格名稱必須完全限定。 例如,Unity 目錄資料表必須使用三層識別碼:<catalog>.<schema>.<table>。 Hive 中繼存放區資料表必須使用 <schema>.<table>

備註

  • 執行完整重新整理更新不會移除接收器中的數據。 任何重新處理的資料都會附加到匯集端,而不會改變現有的資料。
  • sink API 不支援期望值。

語法

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

參數

參數 類型 Description
name str 必須的。 用來識別接收端並用以參考和管理該接收端的字串。 接收端名稱必須在管線中具有唯一性,這包括所有屬於管線的原始程式碼檔案。
format str 必須的。 定義輸出格式的字串,kafkadelta
options dict 格式為 {"key": "value"}的接收選項清單,其中索引鍵和值都是字串。 所有由 Kafka 和 Delta 匯聚接收器支援的 Databricks Runtime 選項均被支援。

範例

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)