Aracılığıyla paylaş


create_sink

Önemli

İşlem hattı create_sink API'si Genel Önizleme aşamasındadır.

İşlev Apache create_sink() Kafka veya Azure Event Hubs gibi bir olay akış hizmetine veya bildirim temelli bir işlem hattından Delta tablosuna yazar. create_sink() işleviyle bir alıcı oluşturduktan sonra, alıcıya veri yazmak için ekleme işlemi'yi kullanırsınız. ekleme akışı, create_sink() işleviyle desteklenen tek akış türüdür. create_auto_cdc_flowgibi diğer akış türleri desteklenmez.

Delta sink, Unity Catalog'un dış ve yönetilen tablolarını ve Hive metastore'un yönetilen tablolarını destekler. Tablo adları tam olarak tanımlanmış olmalıdır. Örneğin, Unity Kataloğu tablolarında üç katmanlı tanımlayıcı kullanılmalıdır: <catalog>.<schema>.<table>. Hive meta veri deposu tablolarında <schema>.<table>kullanılmalıdır.

Uyarı

  • tam yenileme güncellemesi çalıştırıldığında havuzlardaki veri temizlenmez. Yeniden işlenmiş veriler havuza eklenir ve mevcut veriler değiştirilmez.
  • API'de sink ile ilgili beklentiler desteklenmez.

Sözdizimi

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

Parametreler

Parametre Türü Description
name str Gerekli. Lavaboyu tanımlayan ve lavaboya başvurmak ve yönetmek için kullanılan bir dize. Havuz adları, işlem hattının parçası olan tüm kaynak kodu dosyaları da dahil olmak üzere işlem hattı için benzersiz olmalıdır.
format str Gerekli. çıkış biçimini tanımlayan bir dize( kafka veya delta).
options dict {"key": "value"} olarak biçimlendirilmiş lavabo seçenekleri listesi; burada anahtar ve değer her ikisi de stringdir. Kafka ve Delta havuzları tarafından desteklenen tüm Databricks Runtime seçenekleri desteklenir.

Örnekler

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)