Condividi tramite


Uso dei sink nelle pipeline

Importante

L'API sink è disponibile in anteprima pubblica.

Questa pagina descrive le Lakeflow Spark Declarative Pipelines sink API e come usarle con i flussi per scrivere record trasformati da una pipeline in un archivio di dati esterno. I sink di dati esterni includono tabelle gestite ed esterne di Unity Catalog e servizi di streaming di eventi, ad esempio Apache Kafka o Hub eventi di Azure. È anche possibile utilizzare destinazioni di dati per scrivere su origini dati personalizzate scrivendo codice Python per queste ultime.

Annotazioni

Che cosa sono i sink?

I sink sono destinazioni per i flussi in una pipeline. Per impostazione predefinita, i flussi della pipeline generano dati in una tabella di streaming o in una destinazione di visualizzazione materializzata. Si tratta di entrambe le tabelle Delta gestite di Azure Databricks. I sink sono un'alternativa utilizzata per scrivere dati trasformati verso destinazioni quali servizi di streaming di eventi come Apache Kafka o Azure Event Hubs, e tabelle esterne gestite da Unity Catalog. Usando i sink, sono ora disponibili altre opzioni per rendere persistente l'output della pipeline.

Quando è consigliabile usare i sink?

Databricks consiglia di usare i sink se necessario:

  • Creare un caso d'uso operativo come il rilevamento delle frodi, l'analisi in tempo reale e le raccomandazioni dei clienti. I casi d'uso operativi in genere leggono i dati da un bus di messaggi, ad esempio un argomento Apache Kafka, e quindi elaborano i dati con bassa latenza e scrivono nuovamente i record elaborati in un bus di messaggi. Questo approccio consente di ottenere una latenza inferiore senza scrivere o leggere dall'archiviazione cloud.
  • Scrivere i dati trasformati dai propri flussi verso le tabelle gestite da un'istanza esterna di Delta, incluse le tabelle gestite ed esterne del Unity Catalog.
  • Eseguire l'estrazione-trasformazione-caricamento inverso (ETL) verso sink di destinazione esterni a Databricks, come ad esempio i topic di Apache Kafka. Questo approccio consente di supportare in modo efficace i casi d'uso in cui i dati devono essere letti o usati all'esterno delle tabelle di Unity Catalog o di altre risorse di archiviazione gestite da Databricks.
  • È necessario scrivere in un formato di dati non supportato direttamente da Azure Databricks. Le origini dati personalizzate python consentono di creare un sink che scrive in qualsiasi origine dati usando codice Python personalizzato. Vedere Origini dati personalizzate di PySpark.

Come si usano i lavelli?

Man mano che i dati degli eventi vengono inseriti da un'origine di streaming nella pipeline, elabori e perfezioni questi dati nelle trasformazioni nella pipeline. Si usa quindi l'elaborazione del flusso di accodamento per trasmettere i record di dati trasformati in un sink. Puoi creare questo sink utilizzando la funzione create_sink(). Per altri dettagli sulla funzione create_sink, consultare la documentazione API sink.

Se si dispone di una pipeline che crea o elabora i dati degli eventi di streaming e prepara i record di dati per la scrittura, è possibile usare un sink.

L'implementazione di un sink è costituita da due passaggi:

  1. Creare il sink.
  2. Usare un flusso di accodamento per scrivere i record preparati nel sink.

Creare un sink

Databricks supporta diversi tipi di sink di destinazione nei quali si scrivono i record elaborati dai dati del flusso:

  • Tabelle Delta di destinazione (incluse le tabelle di Unity Catalog gestite ed esterne)
  • Sink di Apache Kafka
  • Sink di Hub eventi di Azure
  • Sink personalizzati scritti in Python, utilizzando fonti dati personalizzate in Python

Di seguito sono riportati esempi di configurazioni per i sink Delta, Kafka e Event Hubs di Azure e origini dati personalizzate Python.

Lavandini Delta

Per creare un sink Delta in base al percorso del file:

dp.create_sink(
  name = "delta_sink",
  format = "delta",
  options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

Per creare un sink Delta in base al nome della tabella usando un catalogo completamente qualificato e un percorso dello schema qualificato:

dp.create_sink(
  name = "delta_sink",
  format = "delta",
  options = { "tableName": "catalog_name.schema_name.table_name" }
)

Sink Kafka e Hub eventi di Azure

Questo codice funziona sia per i sink apache Kafka che per Hub eventi di Azure.

credential_name = "<service-credential>"
eh_namespace_name = "dp-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
topic_name = "dp-sink"

dp.create_sink(
name = "eh_sink",
format = "kafka",
options = {
    "databricks.serviceCredential": credential_name,
    "kafka.bootstrap.servers": bootstrap_servers,
    "topic": topic_name
  }
)

credential_name è un riferimento a una credenziale del servizio Catalogo Unity. Per altre informazioni, vedere Usare le credenziali del servizio Catalogo Unity per connettersi ai servizi cloud esterni.

Origini dati personalizzate Python

Supponendo di avere un'origine dati personalizzata Python registrata come my_custom_datasource, il codice seguente può scrivere in tale origine dati.

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python streaming
# data source that writes data to your system.

# Create LDP sink using my_custom_datasource
dp.create_sink(
    name="custom_sink",
    format="my_custom_datasource",
    options={
        <options-needed-for-custom-datasource>
    }
)

# Create append flow to send data to RequestBin
@dp.append_flow(name="flow_to_custom_sink", target="custom_sink")
def flow_to_custom_sink():
    return read_stream("my_source_data")

Per informazioni dettagliate sulla creazione di origini dati personalizzate in Python, vedere Origini dati personalizzate pySpark.

Per maggiori dettagli sull'uso della funzione create_sink, fare riferimento alla documentazione dell'API sink .

Dopo aver creato il sink, è possibile iniziare a trasmettere i record elaborati verso il sink.

Scrivere in un sink con un flusso di aggiunta

Dopo aver creato il sink, il passaggio successivo consiste nello scrivere record elaborati specificandolo come destinazione per i record prodotti da un flusso di aggiunta. Per fare ciò, specificare il sink come valore di target nella decorazione append_flow.

  • Per le tabelle gestite ed esterne di Unity Catalog, usare il formato delta e specificare il percorso o il nome della tabella nelle opzioni. La pipeline deve essere configurata per l'uso di Unity Catalog.
  • Per gli argomenti di Apache Kafka, usare il formato kafka e specificare il nome dell'argomento, le informazioni di connessione e le informazioni di autenticazione nelle opzioni. Queste sono le stesse opzioni supportate da un Sink di Kafka in Spark Structured Streaming. Vedere Configurare il writer di streaming strutturato Kafka.
  • Per Hub eventi di Azure, usare il formato kafka e specificare il nome, le informazioni di connessione e le informazioni di autenticazione di Hub eventi nelle opzioni. Queste sono le stesse opzioni supportate in un sink di Event Hubs Spark Structured Streaming che utilizza l'interfaccia Kafka. Vedere Autenticazione entità servizio con Microsoft Entra ID e Azure Event Hub.

Di seguito sono riportati esempi di come configurare i flussi per la scrittura in sink Delta, Kafka e Hub eventi di Azure con record elaborati dalla pipeline.

Lavandino delta

@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
  return(
  spark.readStream.table("spark_referrers")
  .selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

Sink Kafka e Hub eventi di Azure

@dp.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
  spark.readStream.table("spark_referrers")
  .selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)

Il parametro value è obbligatorio per un sink di Azure Event Hubs. Altri parametri, ad esempio key, partition, headerse topic sono facoltativi.

Per ulteriori dettagli sul decoratore append_flow, consultare Uso di più flussi per scrivere in un'unica destinazione.

Limitazioni

  • È supportata solo l'API Python. SQL non è supportato.

  • Sono supportate solo le query di streaming. Le query batch non sono supportate.

  • È possibile usare solo append_flow per scrivere nei sink. Altri flussi, ad esempio create_auto_cdc_flow, non sono supportati e non è possibile usare un sink in una definizione del set di dati della pipeline. Ad esempio, il codice seguente non è supportato:

    @table("from_sink_table")
    def fromSink():
      return read_stream("my_sink")
    
  • Per i Delta sink, il nome della tabella deve essere completamente qualificato. In particolare, per le tabelle esterne gestite dal catalogo Unity, il nome della tabella deve essere nel formato <catalog>.<schema>.<table>. Per il metastore Hive, deve essere nel formato <schema>.<table>.

  • L'esecuzione di un aggiornamento completo non elimina i dati dei risultati calcolati in precedenza nei sink. Ciò significa che tutti i dati elaborati vengono aggiunti al sink e i dati esistenti non vengono modificati.

  • Le aspettative della pipeline non sono supportate.

Risorse