Condividi tramite


Eseguire query sui dati di streaming

È possibile usare Azure Databricks per eseguire query sulle origini dati di streaming usando Structured Streaming. Azure Databricks offre un supporto completo per i carichi di lavoro di streaming in Python e Scala e supporta la maggior parte delle funzionalità di streaming strutturato con SQL.

Gli esempi seguenti illustrano l'uso di un sink di memoria per l'ispezione manuale dei dati di streaming durante lo sviluppo interattivo nei notebook. A causa dei limiti di output delle righe nell'interfaccia utente del notebook, è possibile che non si osservino tutti i dati letti dalle query di streaming. Nei carichi di lavoro di produzione è consigliabile attivare solo le query di streaming scrivendole in una tabella di destinazione o in un sistema esterno.

Nota

Il supporto SQL per le query interattive sui dati di streaming è limitato ai notebook in esecuzione in calcolo a tutti gli scopi. È anche possibile usare SQL quando si dichiarano tabelle di streaming in Databricks SQL o nelle tabelle live Delta. Vedere Caricare dati usando tabelle di streaming in Databricks SQL e Informazioni sulle tabelle live Delta.

Eseguire query sui dati dai sistemi di streaming

Azure Databricks offre lettori di dati di streaming per i sistemi di streaming seguenti:

  • Kafka
  • Cinesi
  • PubSub
  • Pulsar

È necessario specificare i dettagli di configurazione quando si inizializzano le query su questi sistemi, che variano a seconda dell'ambiente configurato e del sistema da cui si sceglie di leggere. Vedere Configurare le origini dati di streaming.

I carichi di lavoro comuni che coinvolgono i sistemi di streaming includono l'inserimento di dati nella lakehouse e l'elaborazione dei flussi per il sink dei dati in sistemi esterni. Per altre informazioni sui carichi di lavoro di streaming, vedere Streaming in Azure Databricks.

Gli esempi seguenti illustrano un flusso interattivo letto da Kafka:

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Eseguire query su una tabella come flusso letto

Azure Databricks crea tutte le tabelle usando Delta Lake per impostazione predefinita. Quando si esegue una query di streaming su una tabella Delta, la query preleva automaticamente nuovi record quando viene eseguito il commit di una versione della tabella. Per impostazione predefinita, le query di streaming prevedono che le tabelle di origine contengano solo record accodati. Se è necessario usare i dati di streaming che contengono aggiornamenti ed eliminazioni, Databricks consiglia di usare le tabelle Live Delta e APPLY CHANGES INTO. Vedere LE API APPLY CHANGES: Semplificare Change Data Capture con le tabelle Live Delta.

Gli esempi seguenti illustrano l'esecuzione di un flusso interattivo letto da una tabella:

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Eseguire query sui dati nell'archiviazione di oggetti cloud con il caricatore automatico

È possibile trasmettere i dati dall'archiviazione di oggetti cloud usando il caricatore automatico, il connettore dati cloud di Azure Databricks. È possibile usare il connettore con i file archiviati nei volumi del catalogo Unity o in altri percorsi di archiviazione di oggetti cloud. Databricks consiglia di usare volumi per gestire l'accesso ai dati nell'archiviazione di oggetti cloud. Vedere Connettersi alle origini dati.

Azure Databricks ottimizza questo connettore per l'inserimento in streaming dei dati nell'archiviazione di oggetti cloud archiviato in formati strutturati, semistrutturati e non strutturati più diffusi. Databricks consiglia di archiviare i dati inseriti in un formato quasi non elaborato per ottimizzare la velocità effettiva e ridurre al minimo la potenziale perdita di dati a causa di record danneggiati o modifiche dello schema.

Per altre raccomandazioni sull'inserimento di dati dall'archiviazione di oggetti cloud, vedere Inserire dati in un lakehouse di Databricks.

Gli esempi seguenti illustrano un flusso interattivo letto da una directory di file JSON in un volume:

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')