Delen via


create_sink

Belangrijk

De pijplijn-API create_sink bevindt zich in openbare preview.

De create_sink() functie schrijft naar een gebeurtenisstreamingservice zoals Apache Kafka of Azure Event Hubs of naar een Delta-tabel vanuit een declaratieve pijplijn. Nadat u een sink hebt gemaakt met de functie create_sink(), gebruikt u de sink in een toevoegstroom om gegevens naar de sink te schrijven. toevoegstroom is het enige stroomtype dat wordt ondersteund met de functie create_sink(). Andere stroomtypen, zoals create_auto_cdc_flow, worden niet ondersteund.

De Delta-sink ondersteunt externe en beheerde tabellen van Unity Catalog en beheerde Hive-metastore-tabellen. Tabelnamen moeten volledig gespecificeerd zijn. Unity Catalog-tabellen moeten bijvoorbeeld een id met drie lagen gebruiken: <catalog>.<schema>.<table>. Hive-metastore-tabellen moeten <schema>.<table>gebruiken.

Opmerking

  • Bij het uitvoeren van een volledige vernieuwingsupdate, worden geen gegevens uit de sinks gewist. Eventuele opnieuw verwerkte gegevens worden toegevoegd aan de sink en bestaande gegevens worden niet gewijzigd.
  • Verwachtingen worden niet ondersteund met de sink API.

Syntaxis

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

Parameterwaarden

Kenmerk Typologie Description
name str Verplicht. Een reeks die de sink identificeert en gebruikt wordt om naar de sink te verwijzen en deze te beheren. Sinknamen moeten uniek zijn voor de pijplijn, inclusief alle broncodebestanden die deel uitmaken van de pijplijn.
format str Verplicht. Een tekenreeks die de uitvoerindeling definieert, kafka of delta.
options dict Een lijst met mogelijkheden voor sinks, opgemaakt als {"key": "value"}, waarbij beide, de sleutel en de waarde, tekenreeksen zijn. Alle Databricks Runtime-opties die door de Kafka- en Delta-sinks worden ondersteund, zijn beschikbaar.

Voorbeelden

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)