Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
È possibile usare Azure Databricks per eseguire query sulle origini dati di streaming usando Structured Streaming. Azure Databricks offre un supporto completo per i carichi di lavoro di streaming in Python e Scala e supporta la maggior parte delle funzionalità di streaming strutturato con SQL.
Gli esempi seguenti illustrano l'uso di un accumulatore di memoria per l'ispezione manuale dei dati di streaming durante lo sviluppo interattivo nei notebook. A causa dei limiti di output delle righe nell'interfaccia utente del notebook, è possibile che non si osservino tutti i dati letti dalle query di streaming. Nei carichi di lavoro di produzione è consigliabile attivare solo le query di streaming scrivendole in una tabella di destinazione o in un sistema esterno.
Nota
Il supporto SQL per le query interattive sui dati in streaming è limitato ai notebook che operano su risorse di calcolo generali. È anche possibile usare SQL quando si dichiarano tabelle di streaming in Databricks SQL o Nelle pipeline dichiarative di Lakeflow Spark. Vedere Tabelle di streaming e pipeline dichiarative di Lakeflow Spark.
Interrogare i dati dai sistemi di streaming
Azure Databricks offre lettori di dati di streaming per i sistemi di streaming seguenti:
- Kafka
- Kinesi
- PubSub
- Pulsar
È necessario specificare i dettagli di configurazione quando si inizializzano le query su questi sistemi, che variano a seconda dell'ambiente configurato e del sistema da cui si sceglie di leggere. Consulta Connettori Standard in Lakeflow Connect.
I carichi di lavoro comuni che coinvolgono i sistemi di streaming includono l'inserimento di dati nella lakehouse e l'elaborazione dei flussi per trasferire i dati in sistemi esterni. Per altre informazioni sui carichi di lavoro di streaming, vedere Concetti relativi a Structured Streaming.
Gli esempi seguenti illustrano una lettura interattiva in streaming da Kafka.
Pitone
display(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'latest'
);
Interrogare una tabella in modalità di lettura in streaming
Azure Databricks crea tutte le tabelle usando Delta Lake per impostazione predefinita. Quando si esegue una query di streaming su una tabella Delta, la query preleva automaticamente nuovi record quando viene eseguito il commit di una versione della tabella. Per impostazione predefinita, le query di streaming prevedono che le tabelle di origine contengano solo record accodati. Se è necessario usare i dati di streaming che contengono aggiornamenti ed eliminazioni, Databricks consiglia di usare le pipeline dichiarative di Lakeflow Spark e AUTO CDC ... INTO. Consulta le API AUTO CDC: semplificare la cattura dei dati modificati con le pipeline.
Gli esempi seguenti illustrano l'esecuzione di un flusso interattivo letto da una tabella:
Pitone
display(spark.readStream.table("table_name"))
SQL
SELECT * FROM STREAM table_name
Eseguire query sui dati nell'archiviazione di oggetti cloud con il caricatore automatico
È possibile trasmettere i dati dall'archiviazione di oggetti cloud usando il caricatore automatico, il connettore dati cloud di Azure Databricks. È possibile usare il connettore con i file archiviati nei volumi del catalogo Unity o in altri percorsi di archiviazione di oggetti cloud. Databricks consiglia di usare volumi per gestire l'accesso ai dati nell'archiviazione di oggetti cloud. Vedere Connettersi a origini dati e servizi esterni.
Azure Databricks ottimizza questo connettore per l'inserimento in streaming dei dati nell'archiviazione di oggetti cloud archiviato in formati strutturati, semistrutturati e non strutturati più diffusi. Databricks consiglia di archiviare i dati inseriti in un formato quasi non elaborato per ottimizzare la velocità effettiva e ridurre al minimo la potenziale perdita di dati a causa di record danneggiati o modifiche dello schema.
Per altre raccomandazioni sull'inserimento di dati dall'archiviazione di oggetti cloud, vedere Connettori Standard in Lakeflow Connect.
Gli esempi seguenti illustrano un flusso interattivo letto da una directory di file JSON in un volume:
Pitone
display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SQL
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')