Partilhar via


create_sink

Importante

A API de pipeline create_sink encontra-se em Visualização Pública.

A função create_sink() escreve num serviço de streaming de eventos, como o Apache Kafka ou Azure Event Hubs, ou numa tabela Delta a partir de um pipeline declarativo. Depois de criar um poço com a função create_sink(), use o poço em um fluxo de adição para gravar dados no poço. O fluxo de anexação é o único tipo de fluxo suportado com a função create_sink(). Não há suporte para outros tipos de fluxo, como create_auto_cdc_flow, .

O Delta sink suporta tabelas externas e geridas do Unity Catalog e tabelas geridas pelo metastore do Hive. Os nomes das tabelas devem ser totalmente qualificados. Por exemplo, as tabelas do Catálogo Unity devem usar um identificador de três camadas: <catalog>.<schema>.<table>. As tabelas de metastore do Hive devem usar <schema>.<table>.

Observação

  • A execução de uma atualização completa não remove dados dos destinos de dados. Todos os dados reprocessados serão anexados ao coletor e os dados existentes não serão alterados.
  • As expectativas não são suportadas com a sink API.

Sintaxe

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

Parâmetros

Parâmetro Tipo Description
name str Required. Uma cadeia de caracteres que identifica o coletor e é usada para fazer referência e gerenciar o coletor. Os nomes dos sinks devem ser únicos para o pipeline, incluindo todos os arquivos de código-fonte que fazem parte do pipeline.
format str Required. Uma cadeia de caracteres que define o formato de saída, kafka ou delta.
options dict Uma lista de opções de coletor, formatada como {"key": "value"}, onde a chave e o valor são ambas as cadeias de caracteres. Todas as opções de tempo de execução do Databricks suportadas pelos coletores Kafka e Delta são suportadas.

Examples

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)