Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ważne
Interfejs API potoku create_sink jest w publicznej wersji zapoznawczej.
Funkcja create_sink() zapisuje dane w usłudze przesyłania strumieniowego zdarzeń, takiej jak Apache Kafka lub Azure Event Hubs, lub do tabeli Delta z deklaratywnego potoku danych. Po utworzeniu zlewu za pomocą funkcji create_sink() należy użyć go w przepływie dodawania , aby zapisać dane. Przepływ dodawania jest jedynym typem przepływu obsługiwanym przez funkcję create_sink(). Inne typy przepływów, takie jak create_auto_cdc_flow, nie są obsługiwane.
Moduł Delta sink obsługuje tabele zewnętrzne i zarządzane przez Unity Catalog oraz tabele zarządzane w magazynie metadanych Hive. Nazwy tabel muszą być jednoznacznie określone. Na przykład tabele Unity Catalog muszą używać identyfikatora trójpoziomowego: <catalog>.<schema>.<table>. Tabele metadanych Hive muszą używać <schema>.<table>.
Uwaga / Notatka
- Uruchomienie pełnego odświeżenia nie powoduje wyczyszczenia danych z odbiorników. Wszystkie ponownie przetworzone dane zostaną dołączone do ujścia, a istniejące dane nie zostaną zmienione.
- Oczekiwania nie są obsługiwane w interfejsie
sinkAPI.
Składnia
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Parametry
| Parameter | Typ | Description |
|---|---|---|
name |
str |
To jest wymagane. Ciąg, który identyfikuje ujście i służy do odwoływania się do niego oraz zarządzania nim. Nazwy ujścia muszą być unikatowe dla potoku, w tym we wszystkich plikach kodu źródłowego, które są częścią potoku. |
format |
str |
To jest wymagane. Ciąg definiujący format danych wyjściowych kafka lub delta. |
options |
dict |
Lista opcji ujścia sformatowana jako {"key": "value"}, gdzie klucz i wartość są ciągami. Obsługiwane są wszystkie opcje środowiska Databricks Runtime, które są obsługiwane przez wyjścia Kafka i Delta.
|
Przykłady
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)