Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Penting
API alur create_sink berada di Pratinjau Umum.
Fungsi create_sink() menulis ke layanan streaming acara seperti Apache Kafka atau Azure Event Hubs atau ke tabel Delta dari alur deklaratif. Setelah membuat sink dengan fungsi create_sink(), Anda menggunakan sink dalam alur lampiran untuk menulis data ke dalam sink. alur tambahan adalah satu-satunya jenis alur yang didukung oleh fungsi create_sink(). Jenis alur lainnya, seperti create_auto_cdc_flow, tidak didukung.
Delta sink mendukung tabel eksternal dan terkelola dalam Unity Catalog serta tabel terkelola dalam metastore Hive. Nama tabel harus memiliki kualifikasi lengkap. Misalnya, tabel Katalog Unity harus menggunakan pengidentifikasi tiga tingkat: <catalog>.<schema>.<table>. Tabel metastore Hive harus menggunakan <schema>.<table>.
Nota
- Menjalankan pembaruan penyegaran penuh tidak menghapus data dari sink. Setiap data yang diolah ulang akan ditambahkan ke sink, dan data yang ada tidak akan diubah.
- Ekspektasi tidak didukung dengan
sinkAPI.
Syntax
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Parameter-parameternya
| Pengaturan | Tipe | Description |
|---|---|---|
name |
str |
Dibutuhkan. String yang mengidentifikasi sink dan digunakan untuk mereferensikan dan mengelola sink. Nama sink harus unik untuk alur, termasuk di semua file kode sumber yang merupakan bagian dari alur. |
format |
str |
Dibutuhkan. String yang menentukan format output, baik kafka atau delta. |
options |
dict |
Daftar opsi sink, diformat sebagai {"key": "value"}, di mana kunci dan nilai keduanya adalah string. Semua opsi Databricks Runtime yang didukung oleh sink Kafka dan Delta tersedia.
|
Examples
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)