Trasmettere dati come input in Analisi di flusso

Analisi di flusso si integra perfettamente con i flussi dei dati di Azure come input da tre tipi di risorse:

Queste risorse di input possono trovarsi nella stessa sottoscrizione del processo di Analisi di flusso o in una sottoscrizione diversa.

Compressione

Analisi di flusso supporta la compressione per tutte le origini di input. I tipi di compressione supportati sono: Nessuno, Gzip e Deflate. Il supporto per la compressione non è disponibile per i dati di riferimento. Se i dati di input sono compressi, Analisi di flusso gestisce i dati in modo trasparente. Non è necessario specificare il tipo di compressione con la serializzazione Avro.

Creare, modificare o testare gli input

È possibile usare il portale di Azure, Visual Studio e Visual Studio Code per aggiungere e visualizzare o modificare gli input esistenti nel processo di streaming. È anche possibile testare le connessioni di input e testare le query dai dati di esempio di portale di Azure, Visual Studio e Visual Studio Code. Quando si scrive una query, si elencano gli input nella clausola FROM. È possibile ottenere l'elenco degli input disponibili dalla pagina Query del portale. Se si desidera usare più input o JOIN scrivere più SELECT query.

Nota

È consigliabile usare gli strumenti di Analisi di flusso per Visual Studio Code per un'esperienza di sviluppo locale ottimale. Esistono lacune nelle funzionalità note negli strumenti di Analisi di flusso per Visual Studio 2019 (versione 2.6.3000.0) e non verrà migliorata in futuro.

Trasmettere dati da Hub eventi

Hub eventi di Azure è un evento ingestor publish-subscribe altamente scalabile. Un hub eventi può raccogliere milioni di eventi al secondo per consentire di elaborare e analizzare enormi quantità di dati generati dalle applicazioni e dai dispositivi connessi. Insieme, Hub eventi e Analisi di flusso possono offrire una soluzione end-to-end per l'analisi in tempo reale. Hub eventi consente di inserire eventi in Azure in tempo reale e i processi di Analisi di flusso sono in grado di elaborare tali eventi in tempo reale. È ad esempio possibile inviare clic sul Web, letture di sensori o eventi di log online agli hub eventi È quindi possibile creare processi di Analisi di flusso per usare Hub eventi per i dati di input per il filtro, l'aggregazione e la correlazione in tempo reale.

Il timestamp EventEnqueuedUtcTime si riferisce all'arrivo di un evento nell'hub eventi ed è il timestamp predefinito degli eventi provenienti da Hub eventi verso Analisi di flusso. Per elaborare i dati come flusso usando un timestamp nel payload dell'evento, è necessario usare la parola chiave TIMESTAMP BY.

Gruppo di consumer dell'hub eventi

È necessario configurare ogni input dell'hub eventi in modo che abbia un proprio gruppo di consumer. Quando un processo contiene un self-join o ha più input, è possibile che alcuni input vengano letti da più lettori downstream. Questa situazione influisce sul numero di lettori in un singolo gruppo di consumer. Per evitare di superare il limite di cinque lettori imposto da Hub eventi per ogni gruppo di consumer di ciascuna partizione, è consigliabile definire un gruppo di consumer per ogni processo di Analisi di flusso. Esiste anche un limite di 20 gruppi di consumer per un hub eventi di livello Standard. Per altre informazioni, vedere Risolvere i problemi delle connessioni di input.

Creare un input da Hub eventi

La tabella seguente descrive le proprietà disponibili nella pagina Nuovo input del portale di Azure per lo streaming dell'input dei dati da un hub eventi:

Proprietà Descrizione
Alias di input Nome descrittivo che viene usato nella query del processo per fare riferimento a questo input.
Abbonamento Scegliere la sottoscrizione di Azure in cui è presente la risorsa hub eventi.
Spazio dei nomi dell'hub eventi Lo spazio dei nomi di Hub eventi è un contenitore per hub eventi. Quando si crea un hub eventi, viene creato anche lo spazio dei nomi .
Nome hub eventi Nome dell'hub eventi da usare come input.
Gruppo di consumer dell'hub eventi (consigliata) È consigliabile usare un gruppo di consumer distinto per ogni processo di Analisi di flusso. Questa stringa identifica il gruppo di consumer da usare per l'inserimento di dati dall'hub eventi. Se non viene specificato alcun gruppo di consumer, il processo di Analisi di flusso usa il $Default gruppo di consumer.
Modalità di autenticazione Specificare il tipo di autenticazione da usare per connettersi all'hub eventi. È possibile usare un stringa di connessione o un'identità gestita per l'autenticazione con l'hub eventi. Per l'opzione identità gestita, è possibile creare un'identità gestita assegnata dal sistema al processo di Analisi di flusso o un'identità gestita assegnata dall'utente per l'autenticazione con l'hub eventi. Quando si usa un'identità gestita, l'identità gestita deve essere un membro dei ruoli di Hub eventi di Azure Ricevitore dati o Hub eventi di Azure Proprietario dati.
Nome criteri hub eventi Criteri di accesso condiviso che forniscono l'accesso a Hub eventi. Tutti i criteri di accesso condiviso dispongono di un nome e di autorizzazioni impostati, nonché di chiavi di accesso. Questa opzione viene popolata automaticamente, a meno che non si selezioni l'opzione per fornire manualmente le impostazioni di Hub eventi.
Chiave di partizione Si tratta di un campo facoltativo disponibile solo se il processo è configurato per l'uso del livello di compatibilità 1.2 o superiore. Se l'input è partizionato da una proprietà, è possibile aggiungere il nome di questa proprietà qui. Viene usato per migliorare le prestazioni della query se include una PARTITION BY clausola o GROUP BY in questa proprietà. Se questo processo usa il livello di compatibilità 1.2 o versione successiva, il valore predefinito di questo campo è PartitionId.
Formato di serializzazione eventi Formato di serializzazione (JSON, CSV, Avro, Parquet o Altro (Protobuf, XML, proprietario...), del flusso di dati in ingresso. Assicurarsi che il formato JSON sia allineato alla specifica e non includa uno 0 iniziale per i numeri decimali.
Encoding L'unico formato di codifica attualmente supportato è UTF-8.
Tipo di compressione eventi Tipo di compressione usato per leggere il flusso di dati in ingresso, ad esempio Nessuno (impostazione predefinita), Gzip o Deflate.
Registro schemi (anteprima) È possibile selezionare il Registro di sistema dello schema con schemi per i dati degli eventi ricevuti dall'hub eventi.

Quando i dati provengono da un input del flusso di Hub eventi, è possibile accedere ai campi di metadati seguenti nella query di Analisi di flusso:

Proprietà Descrizione
EventProcessedUtcTime Data e ora in cui Analisi di flusso elabora l'evento.
EventEnqueuedUtcTime Data e ora in cui Hub eventi riceve gli eventi.
PartitionId ID di partizione in base zero per l'adattatore di input.

Ad esempio, tramite questi campi, è possibile scrivere una query simile alla seguente:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Nota

Quando si usano Hub eventi come endpoint per hub IoT route, è possibile accedere ai metadati hub IoT usando la funzione GetMetadataPropertyValue.

Dati del flusso dall'hub IoT

Hub IoT di Azure è un servizio di inserimento di eventi di pubblicazione-sottoscrizione altamente scalabile ottimizzato per scenari IoT.

Il timestamp predefinito degli eventi provenienti da un hub IoT in Analisi di flusso è il timestamp in cui l'evento è giunto nell'hub IoT, ovvero EventEnqueuedUtcTime. Per elaborare i dati come flusso usando un timestamp nel payload dell'evento, è necessario usare la parola chiave TIMESTAMP BY.

Gruppo di consumer dell'hub IoT

È necessario configurare uno specifico gruppo di consumer per ogni input dell'hub IoT di Analisi di flusso. Quando un processo contiene un self-join o ha più input, è possibile che qualche input venga letto da più lettori downstream. Questa situazione influisce sul numero di lettori in un singolo gruppo di consumer. Per evitare di superare il limite di cinque lettori imposto da Hub IoT di Azure per ogni gruppo di consumer di ciascuna partizione, è consigliabile definire un gruppo di consumer per ogni processo di Analisi di flusso.

Configurare un hub IoT come input del flusso dei dati

La tabella seguente contiene la descrizione delle proprietà disponibili nella pagina Nuovo input del portale di Azure quando si configura un hub IoT come input del flusso.

Proprietà Descrizione
Alias di input Nome descrittivo che viene usato nella query del processo per fare riferimento a questo input.
Abbonamento Scegliere la sottoscrizione in cui esiste la risorsa dell'hub IoT.
Hub IoT Nome dell'hub IoT da usare come input.
Gruppo di consumer È consigliabile usare un gruppo di consumer diverso per ogni processo di Analisi di flusso. Il gruppo di consumer viene usato per inserire dati dall'hub IoT. Analisi di flusso usa il gruppo di consumer $Default se non diversamente specificato.
Nome criteri di accesso condiviso Criteri di accesso condiviso che consentono di accedere all'hub IoT. Tutti i criteri di accesso condiviso dispongono di un nome e di autorizzazioni impostati, nonché di chiavi di accesso.
Chiave criteri di accesso condiviso Chiave di accesso condiviso usata per autorizzare l'accesso all'hub IoT. Il valore di questa opzione viene inserito automaticamente, a meno che non si selezioni l'opzione per specificare le impostazioni dell'hub IoT manualmente.
Endpoint Endpoint per l'hub IoT.
Chiave di partizione Si tratta di un campo facoltativo disponibile solo se il processo è configurato per l'uso del livello di compatibilità 1.2 o superiore. Se l'input è partizionato da una proprietà, è possibile aggiungere il nome di questa proprietà qui. Viene usato per migliorare le prestazioni della query se include una clausola PARTITION BY o GROUP BY in questa proprietà. Se questo processo usa il livello di compatibilità 1.2 o versione successiva, per impostazione predefinita questo campo è "PartitionId".
Formato di serializzazione eventi Formato di serializzazione (JSON, CSV, Avro, Parquet o Altro (Protobuf, XML, proprietario...), del flusso di dati in ingresso. Assicurarsi che il formato JSON sia allineato alla specifica e non includa uno 0 iniziale per i numeri decimali.
Encoding L'unico formato di codifica attualmente supportato è UTF-8.
Tipo di compressione eventi Tipo di compressione usato per leggere il flusso di dati in ingresso, ad esempio Nessuno (impostazione predefinita), Gzip o Deflate.

Quando si usano i dati di flusso provenienti da un hub IoT, è possibile accedere ai campi di metadati seguenti nella query di Analisi di flusso:

Proprietà Descrizione
EventProcessedUtcTime Data e ora di elaborazione dell'evento.
EventEnqueuedUtcTime Data e ora in cui il hub IoT riceve l'evento.
PartitionId ID di partizione in base zero per l'adattatore di input.
IoTHub.MessageId ID usato per correlare la comunicazione bidirezionale nell'hub IoT.
IoTHub.CorrelationId ID usato nelle risposte dei messaggi e nei feedback nell'hub IoT.
IoTHub.ConnectionDeviceId ID di autenticazione usato per inviare questo messaggio. Questo valore viene stampato sui messaggi associati al servizio dal hub IoT.
IoTHub.ConnectionDeviceGenerationId ID di generazione del dispositivo autenticato che è stato usato per inviare questo messaggio. Questo valore viene riportato nei messaggi servicebound dall'hub IoT.
IoTHub.EnqueuedTime Ora in cui il hub IoT riceve il messaggio.

Trasmettere dati da Archiviazione BLOB o Data Lake Storage Gen2

Per scenari con grandi quantità di dati non strutturati da archiviare nel cloud, Archiviazione BLOB di Azure o Azure Data Lake Archiviazione Gen2 offre una soluzione conveniente e scalabile. I dati nell'archiviazione BLOB o in Azure Data Lake Archiviazione Gen2 sono considerati dati inattivi. Tuttavia, questi dati possono essere elaborati come flusso di dati da Analisi di flusso.

L'elaborazione dei log è uno scenario comunemente usato per l'uso di tali input con Analisi di flusso. In questo scenario, i file di dati di telemetria vengono acquisiti da un sistema e devono essere analizzati ed elaborati per estrarre dati significativi.

Il timestamp predefinito di un archivio BLOB o di un evento di Azure Data Lake Archiviazione Gen2 in Analisi di flusso è il timestamp dell'ultima modifica, ovvero BlobLastModifiedUtcTime. Se un BLOB viene caricato in un account di archiviazione alle 13:00 e il processo di Analisi di flusso di Azure viene avviato usando l'opzione Ora alle 13:01, non verrà prelevata man mano che il tempo modificato non rientra nel periodo di esecuzione del processo.

Se un BLOB viene caricato in un contenitore dell'account di archiviazione alle 13:00 e il processo di Analisi di flusso di Azure viene avviato con Ora personalizzata alle 13:00 o prima, il BLOB verrà prelevato quando l'ora modificata rientra nel periodo di esecuzione del processo.

Se un processo di Analisi di flusso di Azure viene avviato con Now alle 13:00 e un BLOB viene caricato nel contenitore dell'account di archiviazione alle 13:01, Analisi di flusso di Azure preleva il BLOB. Il timestamp assegnato a ogni BLOB è basato solo su BlobLastModifiedTime. La cartella in cui si trova il BLOB non ha alcuna relazione con il timestamp assegnato. Ad esempio, se è presente un BLOB 2019/10-01/00/b1.txt con un BlobLastModifiedTime di 2019-11-11, il timestamp assegnato a questo BLOB è 2019-11-11.

Per elaborare i dati come flusso usando un timestamp nel payload dell'evento, è necessario usare la parola chiave TIMESTAMP BY. Un processo di Analisi di flusso esegue il pull dei dati dall'archiviazione BLOB di Azure o dall'input di Azure Data Lake Archiviazione Gen2 ogni secondo se il file BLOB è disponibile. Se il file BLOB non è disponibile, è presente un backoff esponenziale con un ritardo di tempo massimo di 90 secondi.

Nota

Analisi di flusso di Azure non supporta l'aggiunta di contenuto a un file di BLOB esistente. Analisi di flusso di Azure visualizza ogni file una sola volta e tutte le modifiche apportate al file dopo che il processo ha letto i dati non vengono elaborate. La procedura consigliata consiste nel caricare simultaneamente tutti i dati per un file di BLOB e quindi aggiungere gli altri eventi più nuovi in un nuovo file di BLOB diverso.

Negli scenari in cui molti BLOB vengono aggiunti continuamente e Analisi di flusso elabora i BLOB man mano che vengono aggiunti, è possibile che alcuni BLOB vengano ignorati in rari casi a causa della granularità di BlobLastModifiedTime. È possibile attenuare questo caso caricando i BLOB a distanza di almeno due secondi. Se questa opzione non è fattibile, è possibile usare Hub eventi per trasmettere grandi volumi di eventi.

Configurare l'archiviazione BLOB come input del flusso

La tabella seguente contiene la descrizione delle proprietà disponibili nella pagina Nuovo input del portale di Azure quando si configura l'archiviazione BLOB come input del flusso.

Proprietà Descrizione
Alias di input Nome descrittivo che viene usato nella query del processo per fare riferimento a questo input.
Abbonamento Scegliere la sottoscrizione in cui esiste la risorsa di archiviazione.
Account di archiviazione Nome dell'account di archiviazione in cui si trovano i file BLOB.
Chiave dell'account di archiviazione Chiave privata associata all'account di archiviazione. Questa opzione viene popolata automaticamente in, a meno che non si selezioni l'opzione per fornire manualmente le impostazioni.
Contenitore I contenitori forniscono un raggruppamento logico per i BLOB. È possibile scegliere il contenitore Usa esistente o Crea nuovo per creare un nuovo contenitore.
Modalità di autenticazione Specificare il tipo di autenticazione da usare per connettersi all'account di archiviazione. È possibile usare un stringa di connessione o un'identità gestita per l'autenticazione con l'account di archiviazione. Per l'opzione identità gestita, è possibile creare un'identità gestita assegnata dal sistema al processo di Analisi di flusso o un'identità gestita assegnata dall'utente per l'autenticazione con l'account di archiviazione. Quando si usa un'identità gestita, l'identità gestita deve essere membro di un ruolo appropriato nell'account di archiviazione.
Modello percorso (facoltativa) Percorso del file usato per trovare gli oggetti BLOB nel contenitore specificato. Se si desidera leggere i BLOB dalla radice del contenitore, non impostare un modello di percorso. All'interno del percorso è possibile specificare una o più istanze delle tre variabili seguenti: {date}, {time} o {partition}

Esempio 1: cluster1/logs/{date}/{time}/{partition}

Esempio 2: cluster1/logs/{date}

Il * carattere non è un valore consentito per il prefisso del percorso. Sono consentiti solo Caratteri BLOB di Azure validi. Non includere nomi di contenitori o nomi di file.
Formato data (facoltativa) Formato della data in base al quale vengono organizzati i file, se si usa la variabile date nel percorso. Esempio: YYYY/MM/DD

Quando l'input del BLOB ha {date} o {time} nel percorso, le cartelle vengono esaminate in ordine di tempo crescente.
Formato ora (facoltativa) Formato dell'ora in base al quale vengono organizzati i file, se si usa la variabile time nel percorso. Al momento, l'unico valore supportato è HH per le ore.
Chiave di partizione Si tratta di un campo facoltativo disponibile solo se il processo è configurato per l'uso del livello di compatibilità 1.2 o superiore. Se l'input è partizionato da una proprietà, è possibile aggiungere il nome di questa proprietà qui. Viene usato per migliorare le prestazioni della query se include una clausola PARTITION BY o GROUP BY in questa proprietà. Se questo processo usa il livello di compatibilità 1.2 o versione successiva, per impostazione predefinita questo campo è "PartitionId".
Conteggio delle partizioni di input Questo campo è presente solo quando {partition} è presente nel modello di percorso. Il valore di questa proprietà è un numero intero >=1. Ovunque venga visualizzato {partition} in pathPattern, verrà usato un numero compreso tra 0 e il valore di questo campo -1.
Formato di serializzazione eventi Formato di serializzazione (JSON, CSV, Avro, Parquet o Altro (Protobuf, XML, proprietario...), del flusso di dati in ingresso. Assicurarsi che il formato JSON sia allineato alla specifica e non includa uno 0 iniziale per i numeri decimali.
Encoding Per CSV e JSON, l'unico formato di codifica attualmente supportato è UTF-8.
Compressione Tipo di compressione usato per leggere il flusso di dati in ingresso, ad esempio Nessuno (impostazione predefinita), Gzip o Deflate.

Quando i dati provengono da un'origine di archiviazione BLOB, è possibile accedere ai campi di metadati seguenti nella query di Analisi di flusso:

Proprietà Descrizione
BlobName Nome del BLOB di input da cui proviene l'evento.
EventProcessedUtcTime Data e ora in cui Analisi di flusso elabora l'evento.
BlobLastModifiedUtcTime Data e ora dell'ultima modifica del BLOB.
PartitionId ID di partizione in base zero per l'adattatore di input.

Ad esempio, tramite questi campi, è possibile scrivere una query simile alla seguente:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Trasmettere dati da Apache Kafka

Analisi di flusso di Azure consente di connettersi direttamente ai cluster Apache Kafka per inserire dati. La soluzione è un codice ridotto e completamente gestita dal team di Analisi di flusso di Azure presso Microsoft, consentendogli di soddisfare gli standard di conformità aziendali. L'input Kafka è compatibile con le versioni precedenti e supporta tutte le versioni con la versione più recente del client a partire dalla versione 0.10. Gli utenti possono connettersi ai cluster Kafka all'interno di una rete virtuale e di cluster Kafka con un endpoint pubblico, a seconda delle configurazioni. La configurazione si basa sulle convenzioni di configurazione Kafka esistenti. I tipi di compressione supportati sono Nessuno, Gzip, Snappy, LZ4 e Zstd.

Per altre informazioni, vedere Trasmettere dati da Kafka ad Analisi di flusso di Azure (anteprima).

Passaggi successivi