Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Belangrijk
De pijplijn-API create_sink bevindt zich in openbare preview.
De create_sink() functie schrijft naar een gebeurtenisstreamingservice zoals Apache Kafka of Azure Event Hubs of naar een Delta-tabel vanuit een declaratieve pijplijn. Nadat u een sink hebt gemaakt met de functie create_sink(), gebruikt u de sink in een toevoegstroom om gegevens naar de sink te schrijven. toevoegstroom is het enige stroomtype dat wordt ondersteund met de functie create_sink(). Andere stroomtypen, zoals create_auto_cdc_flow, worden niet ondersteund.
De Delta-sink ondersteunt externe en beheerde tabellen van Unity Catalog en beheerde Hive-metastore-tabellen. Tabelnamen moeten volledig gespecificeerd zijn. Unity Catalog-tabellen moeten bijvoorbeeld een id met drie lagen gebruiken: <catalog>.<schema>.<table>. Hive-metastore-tabellen moeten <schema>.<table>gebruiken.
Opmerking
- Bij het uitvoeren van een volledige vernieuwingsupdate, worden geen gegevens uit de sinks gewist. Eventuele opnieuw verwerkte gegevens worden toegevoegd aan de sink en bestaande gegevens worden niet gewijzigd.
- Verwachtingen worden niet ondersteund met de
sinkAPI.
Syntaxis
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Parameterwaarden
| Kenmerk | Typologie | Description |
|---|---|---|
name |
str |
Verplicht. Een reeks die de sink identificeert en gebruikt wordt om naar de sink te verwijzen en deze te beheren. Sinknamen moeten uniek zijn voor de pijplijn, inclusief alle broncodebestanden die deel uitmaken van de pijplijn. |
format |
str |
Verplicht. Een tekenreeks die de uitvoerindeling definieert, kafka of delta. |
options |
dict |
Een lijst met mogelijkheden voor sinks, opgemaakt als {"key": "value"}, waarbij beide, de sleutel en de waarde, tekenreeksen zijn. Alle Databricks Runtime-opties die door de Kafka- en Delta-sinks worden ondersteund, zijn beschikbaar.
|
Voorbeelden
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)