Bagikan melalui


create_sink

Penting

API alur create_sink berada di Pratinjau Umum.

Fungsi create_sink() menulis ke layanan streaming acara seperti Apache Kafka atau Azure Event Hubs atau ke tabel Delta dari alur deklaratif. Setelah membuat sink dengan fungsi create_sink(), Anda menggunakan sink dalam alur lampiran untuk menulis data ke dalam sink. alur tambahan adalah satu-satunya jenis alur yang didukung oleh fungsi create_sink(). Jenis alur lainnya, seperti create_auto_cdc_flow, tidak didukung.

Delta sink mendukung tabel eksternal dan terkelola dalam Unity Catalog serta tabel terkelola dalam metastore Hive. Nama tabel harus memiliki kualifikasi lengkap. Misalnya, tabel Katalog Unity harus menggunakan pengidentifikasi tiga tingkat: <catalog>.<schema>.<table>. Tabel metastore Hive harus menggunakan <schema>.<table>.

Nota

  • Menjalankan pembaruan penyegaran penuh tidak menghapus data dari sink. Setiap data yang diolah ulang akan ditambahkan ke sink, dan data yang ada tidak akan diubah.
  • Ekspektasi tidak didukung dengan sink API.

Syntax

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

Parameter-parameternya

Pengaturan Tipe Description
name str Dibutuhkan. String yang mengidentifikasi sink dan digunakan untuk mereferensikan dan mengelola sink. Nama sink harus unik untuk alur, termasuk di semua file kode sumber yang merupakan bagian dari alur.
format str Dibutuhkan. String yang menentukan format output, baik kafka atau delta.
options dict Daftar opsi sink, diformat sebagai {"key": "value"}, di mana kunci dan nilai keduanya adalah string. Semua opsi Databricks Runtime yang didukung oleh sink Kafka dan Delta tersedia.

Examples

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)