Usare la parallelizzazione delle query in Analisi di flusso di Azure

Questo articolo illustra come sfruttare i vantaggi della parallelizzazione in Analisi di flusso di Azure. Si apprenderà come ridimensionare i processi di Analisi di flusso configurando partizioni di input e ottimizzando la definizione di query.

Come prerequisito, potrebbe essere necessario avere familiarità con la nozione di unità di streaming descritta in Comprendere e regolare le unità di streaming.

Quali sono le parti di un processo di Analisi di flusso?

Una definizione del processo di Analisi di flusso include almeno un input, una query e un output di flusso. Gli input sono le origini da cui il processo legge il flusso di dati, la query viene usata per trasformare il flusso di input dei dati e l'output è la destinazione a cui il processo invia i risultati.

Partizioni in input e output

Il partizionamento consente di suddividere i dati in subset in base a una chiave di partizione. Se l'input ,ad esempio Hub eventi, è partizionato da una chiave, è consigliabile specificare la chiave di partizione quando si aggiunge un input al processo di Analisi di flusso. Il ridimensionamento di un processo di Analisi di flusso sfrutta i vantaggi offerti dall'uso di partizioni nell'input e nell'output. Un processo di Analisi di flusso può usare diverse partizioni e scrivervi in parallelo, aumentando così la velocità effettiva.

Input

Tutti gli input di streaming di Analisi di flusso di Azure possono sfruttare il partizionamento: Hub eventi, hub IoT, archiviazione BLOB, Data Lake Archiviazione Gen2.

Nota

Per il livello di compatibilità 1.2 e versioni successive, la chiave di partizione deve essere impostata come proprietà di input, senza la necessità della parola chiave PARTITION BY nella query. Per il livello di compatibilità 1.1 e versioni successive, la chiave di partizione deve invece essere definita con la parola chiave PARTITION BY nella query.

Output

Quando si usa Analisi di flusso di Azure, è possibile sfruttare il partizionamento negli output:

  • Archiviazione di Azure Data Lake
  • Funzioni di Azure
  • Tabella di Azure
  • Archiviazione BLOB (è necessario impostare la chiave di partizione in modo esplicito)
  • Azure Cosmos DB (è necessario impostare la chiave di partizione in modo esplicito)
  • Hub eventi (è necessario impostare la chiave di partizione in modo esplicito)
  • Hub IoT (è necessario impostare la chiave di partizione in modo esplicito)
  • Bus di servizio
  • SQL e Azure Synapse Analytics con partizionamento facoltativo: vedere altre informazioni nella pagina Output da database SQL di Azure.

Power BI non supporta il partizionamento. Tuttavia, è comunque possibile partizionare l'input come descritto in questa sezione.

Per altre informazioni sulle partizioni, vedere gli articoli seguenti:

Query

Affinché un processo sia parallelo, le chiavi di partizione devono essere allineate tra tutti gli input, tutti i passaggi della logica di query e tutti gli output. Il partizionamento della logica di query è determinato dalle chiavi usate per join e aggregazioni (GROUP BY). Questo ultimo requisito può essere ignorato se la logica della query non è chiave (proiezione, filtri, join referenziale... ).

  • Se un input e un output sono partizionati da WarehouseIde i gruppi di query in ProductId base a senza WarehouseId, il processo non è parallelo.
  • Se due input da unire sono partizionati da chiavi di partizione diverse (WarehouseId e ProductId), il processo non è parallelo.
  • Se due o più flussi di dati indipendenti sono contenuti in un singolo processo, ognuno con la propria chiave di partizione, il processo non è parallelo.

Solo quando tutti gli input, gli output e i passaggi di query usano la stessa chiave, il processo è parallelo.

Processi perfettamente paralleli

Un processo perfettamente parallelo è lo scenario più scalabile che può presentarsi in Analisi di flusso di Azure. Connette una partizione dell'input inviato a un'istanza della query a una partizione dell'output. Questo parallelismo presenta i requisiti seguenti:

  • Se la logica di query richiede che la stessa chiave venga elaborata dalla stessa istanza di query, è necessario verificare che gli eventi siano diretti alla stessa partizione dell'input. Per Hub eventi o hub IoT, significa che i dati dell'evento devono avere il valore PartitionKey impostato. In alternativa, è possibile usare mittenti partizionati. Per l'archiviazione BLOB, questo significa che gli eventi vengono inviati alla stessa cartella di partizione. Un esempio è costituito da un'istanza di query che aggrega i dati per userID, dove l'hub eventi di input viene partizionato usando userID come chiave di partizione. Tuttavia, se la logica di query non richiede che la stessa chiave venga elaborata dalla stessa istanza di query, è possibile ignorare questo requisito. Un esempio di questa logica è offerto da una query semplice select-project-filter.

  • Il passaggio successivo consiste nel partizionare la query. Per i processi con livello di compatibilità 1.2 o superiore (scelta consigliata), la colonna personalizzata può essere specificata come chiave di partizione nelle impostazioni di input e il processo verrà eseguito automaticamente in parallelo. Per i processi con livello di compatibilità 1.0 o 1.1, è necessario usare PARTITION BY PartitionId in tutti i passaggi della query. È possibile eseguire più passaggi, ma tutti devono essere partizionati con la stessa chiave.

  • Per la maggior parte degli output supportati in Analisi di flusso di Azure, è possibile sfruttare il partizionamento. Se si usa un tipo di output che non supporta il partizionamento, il processo non risulterà perfettamente parallelo. Per l'output di Hub eventi, verificare che la colonna Chiave di partizione sia impostata sulla stessa chiave di partizione usata nella query. Per altre informazioni, vedere la sezione output.

  • Il numero delle partizioni di input deve essere uguale a quello delle partizioni di output. L'output dell'archiviazione BLOB può supportare le partizioni ed eredita lo schema di partizionamento della query a monte. Quando viene specificata una chiave di partizione per l'archiviazione BLOB, i dati vengono partizionati per partizione di input, pertanto il risultato è ancora completamente parallelo. Ecco alcuni esempi di valori di partizioni che consentono un processo perfettamente parallelo:

    • Otto partizioni di input dell'hub eventi e otto partizioni di output dell'hub eventi
    • Otto partizioni di input dell'hub eventi e output dell'archiviazione BLOB
    • Otto partizioni di input dell'hub eventi e output dell'archiviazione BLOB partizionati da un campo personalizzato con cardinalità arbitraria
    • Otto partizioni di input dell'archiviazione BLOB e output dell'archiviazione BLOB
    • Otto partizioni di input dell'archiviazione BLOB e otto partizioni di output dell'hub eventi

Le sezioni seguenti illustrano alcuni esempi di scenari perfettamente paralleli.

Query semplice

  • Input: un hub eventi con otto partizioni
  • Output: un hub eventi con otto partizioni ("Colonna chiave di partizione" deve essere impostato per l'uso PartitionId)

Query:

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

Questa query è un filtro semplice. Non è pertanto necessario preoccuparsi del partizionamento dell'input inviato all'hub eventi. Si noti che i processi con livello di compatibilità precedente a 1.2 devono includere la clausola PARTITION BY PartitionId, in modo da soddisfare il requisito 2 descritto in precedenza. A livello di output, è necessario configurare l'output dell'hub eventi nel processo in modo che la chiave di partizione sia impostata su PartitionId. È infine necessario verificare che il numero delle partizioni di input sia uguale a quello delle partizioni di output.

Query con chiave di raggruppamento

  • Input: Hub eventi con otto partizioni
  • Output: archiviazione BLOB

Query:

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Questa query include una chiave di raggruppamento. Di conseguenza, gli eventi raggruppati devono essere inviati alla stessa partizione di Hub eventi. Poiché in questo esempio viene eseguito il raggruppamento di TollBoothID, è necessario assicurarsi che TollBoothID venga usato come chiave di partizione quando gli eventi vengono inviati a Hub eventi. In Analisi di flusso di Azure è quindi possibile usare PARTITION BY PartitionId per ereditare da questo schema di partizione e abilitare la parallelizzazione completa. Poiché l'output è costituito dall'archiviazione BLOB, non occorre preoccuparsi di configurare un valore di chiave di partizione, come definito dal requisito 4.

Esempio di scenari che non sono* imbarazzantemente paralleli

Nella sezione precedente l'articolo ha illustrato alcuni scenari imbarazzanti paralleli. In questa sezione vengono illustrati gli scenari che non soddisfano tutti i requisiti per essere imbarazzanti paralleli.

Numero di partizioni non corrispondente

  • Input: un hub eventi con otto partizioni
  • Output: un hub eventi con 32 partizioni

Se il numero delle partizioni di input non corrisponde a quello delle partizioni di output, la topologia non è perfettamente parallela, indipendentemente dalla query. Tuttavia, è comunque possibile ottenere un certo livello di parallelizzazione.

Query con output non partizionati

  • Input: un hub eventi con otto partizioni
  • Output: Power BI

L'output Power BI attualmente non supporta il partizionamento. Pertanto, questo scenario non è imbarazzantemente parallelo.

Query a più passaggi con valori diversi per PARTITION BY

  • Input: Hub eventi con otto partizioni
  • Output: Hub eventi con otto partizioni
  • Livello di compatibilità: 1.0 o 1.1

Query:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Come è possibile osservare, il secondo passaggio usa TollBoothId come chiave di partizionamento, a differenza del primo passaggio. Questo passaggio non è lo stesso del primo passaggio e pertanto è necessario eseguire una sequenza casuale.

Query a più passaggi con valori diversi per PARTITION BY

  • Input: Hub eventi con otto partizioni ("Colonna chiave di partizione" non impostata, valore predefinito su "PartitionId")
  • Output: l'hub eventi con otto partizioni ("Colonna chiave di partizione" deve essere impostato per usare "TollBoothId")
  • Livello di compatibilità - 1.2 o versione successiva

Query:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Per impostazione predefinita, il livello di compatibilità 1.2 o versione successiva consente l'esecuzione di query in parallelo. Ad esempio, la query della sezione precedente verrà partizionata a condizione che la colonna "TollBoothId" sia impostata come chiave di partizione di input. La clausola PARTITION BY PartitionId non è obbligatoria.

Calcolare il numero massimo di unità di streaming di un processo

Il numero totale di unità di streaming che possono essere usate da un processo di Analisi dei flussi dipende dal numero di passaggi nella query definita per il processo e dal numero di partizioni per ogni passaggio.

Passaggi in una query

Una query può includere uno o più passaggi. Ogni passaggio è una sottoquery definita mediante la parola chiave WITH. Anche la query esterna alla parola chiave WITH (una sola query) viene contata come passaggio. Ad esempio, l'istruzione SELECT nella query seguente:

Query:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

Questa query include due passaggi.

Nota

Questa query viene illustrata in dettaglio più avanti nell'articolo.

Partizionamento di un passaggio

Il partizionamento di un passaggio richiede le condizioni seguenti:

  • L'origine di input deve essere partizionata.
  • L'istruzione SELECT della query deve leggere da un'origine di input partizionata.
  • La query all'interno del passaggio deve includere la parola chiave PARTITION BY.

Quando una query è partizionata, gli eventi di input vengono elaborati e aggregati in gruppi separati di partizioni e per ogni gruppo vengono generati eventi di output. Se si desidera un'aggregazione combinata, è necessario creare un secondo passaggio non partizionato da aggregare.

Calcolare il numero massimo di unità di streaming per un processo

Tutti i passaggi non partizionati insieme possono aumentare fino a un'unità di streaming (SU V2s) per un processo di Analisi di flusso. È anche possibile aggiungere un su V2 per ogni partizione in un passaggio partizionato. È possibile visualizzare alcuni esempi nella tabella seguente.

Query Numero massimo di unità di streaming per il processo
  • La query contiene un unico passaggio.
  • Il passaggio non è partizionato.
1 SU V2
  • Il flusso di dati di input è suddiviso in 16 partizioni.
  • La query contiene un unico passaggio.
  • Il passaggio è partizionato.
16 SU V2 (1 * 16 partizioni)
  • La query contiene due passaggi.
  • Nessuno dei passaggi è partizionato.
1 SU V2
  • Il flusso di dati di input è suddiviso in 3 partizioni.
  • La query contiene due passaggi. Il passaggio di input è partizionato e il secondo passaggio non è.
  • L'istruzione SELECT legge dall'input partizionato.
4 SU V2s (3 per i passaggi partizionati + 1 per i passaggi non partizionati

Esempi di ridimensionamento

La query seguente calcola il numero di automobili che passano per una stazione di pedaggio con tre caselli in un intervallo di tre minuti. Questa query può essere ridimensionata fino a una su V2.

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Per usare più unità di streaming per la query, è necessario che il flusso di dati di input e la query siano entrambi partizionati. Poiché la partizione del flusso di dati è impostata su 3, la query modificata seguente può essere ridimensionata fino a 3 UNITÀ di streaming V2:

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Quando una query è partizionata, gli eventi di input vengono elaborati e aggregati in gruppi di partizioni separati e vengono anche generati eventi di output per ognuno dei gruppi. Il partizionamento può causare risultati imprevisti quando il campo GROUP BY non è la chiave di partizione nel flusso di dati di input. Ad esempio, il campo TollBoothId nella query precedente non è la chiave di partizione di Input1. I dati provenienti dal casello 1 possono pertanto essere distribuiti in più partizioni.

Le singole partizioni di Input1 verranno elaborate separatamente da Analisi di flusso. Verranno pertanto creati più record del conteggio relativo al passaggio di automobili dallo stesso casello nella stessa finestra a cascata. Se la chiave di partizione di input non può essere modificata, questo problema può essere risolto aggiungendo un passaggio non partizionato per aggregare i valori tra le partizioni, come nell'esempio seguente:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Questa query può essere ridimensionata a 4 unità di streaming V2.

Nota

Se si uniscono due flussi, verificare che tali flussi siano partizionati in base alla chiave di partizione della colonna usata per le unioni. Controllare inoltre che in entrambi i flussi sia presente lo stesso numero di partizioni.

Ottenimento di una velocità effettiva più elevata su larga scala

Anche se necessario, un processo perfettamente parallelo non è sufficiente per sostenere una velocità effettiva più elevata su larga scala. Ogni sistema di archiviazione e il corrispondente output di Analisi di flusso presenta variazioni su come ottenere la migliore velocità effettiva di scrittura possibile. Come per qualsiasi scenario su larga scala, è possibile risolvere alcuni problemi usando le configurazioni corrette. Questa sezione illustra le configurazioni per alcuni output comuni e fornisce esempi per sostenere la frequenza di inserimento di 1 K, 5 K e 10 K eventi al secondo.

Le osservazioni seguenti usano un processo di Analisi di flusso con query pass-through senza stato, una funzione definita dall'utente JavaScript di base che scrive in Hub eventi, Azure SQL o Azure Cosmos DB.

Hub eventi di

Velocità di inserimento (eventi al secondo) Unità di streaming Risorse di output
1 K 1/3 2 TU
5 K 1 6 TU
10 K 2 10 TU

La soluzione Hub eventi si ridimensiona in modo lineare in termini di unità di streaming (SU) e velocità effettiva, rendendola il modo più efficiente ed efficiente per analizzare e trasmettere i dati da Analisi di flusso. I processi possono essere ridimensionati fino a 66 unità di streaming V2, che si traduce approssimativamente nell'elaborazione fino a 400 MB/s o a 38 trilioni di eventi al giorno.

Azure SQL

Velocità di inserimento (eventi al secondo) Unità di streaming Risorse di output
1 K 2/3 S3
5 K 3 P4
10 K 6 P6

Azure SQL supporta la scrittura in parallelo, denominata " Eredita schema di partizione", ma non è abilitata per impostazione predefinita. Tuttavia, l'abilitazione dell'ereditarietà del partizionamento, insieme a una query completamente parallela, potrebbe non essere sufficiente per ottenere velocità effettiva più elevate. Le velocità effettiva di scrittura SQL dipendono in modo significativo dalla configurazione del database e dallo schema della tabella. L'articolo relativo alle prestazioni output di SQL offre maggiori dettagli sui parametri che possono ottimizzare la velocità effettiva di scrittura. Come indicato nell'articolo Output di Analisi di flusso di Azure per database SQL di Azure, questa soluzione non viene ridimensionata in modo lineare come pipeline completamente parallela oltre 8 partizioni e potrebbe essere necessario ripartizionare prima dell'output SQL (vedere INTO). Gli SKU Premium sono necessari per sostenere velocità di I/O elevate, oltre al sovraccarico dovuto ai backup del log effettuati ogni pochi minuti.

Azure Cosmos DB

Velocità di inserimento (eventi al secondo) Unità di streaming Risorse di output
1 K 2/3 20 K UR
5 K 4 60 K UR
10 K 8 120 K UR

L'output di Azure Cosmos DB da Analisi di flusso è stato aggiornato per usare l'integrazione nativa con il livello di compatibilità 1.2. Il livello di compatibilità 1.2 consente una velocità effettiva significativamente superiore e riduce il consumo di UR rispetto al livello 1.1, che rappresenta il livello di compatibilità predefinito per i nuovi processi. La soluzione usa i contenitori di Azure Cosmos DB partizionati in /deviceId e il resto della soluzione è configurato in modo identico.

Tutti gli esempi di streaming su larga scala di Azure usano Hub eventi come input alimentato da client di test simulati dal carico. Ogni evento di input è un documento JSON di 1 KB, che converte facilmente le velocità di inserimento configurate in velocità effettiva (1 MB/s, 5 MB/s e 10 MB/s). Gli eventi simulano un dispositivo IoT che invia i dati JSON seguenti (in formato abbreviato) per un massimo di 1.000 dispositivi:

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Nota

Le configurazioni sono soggette a modifiche a causa dei vari componenti usati nella soluzione. Per una stima più accurata, personalizzare gli esempi per adattarli allo scenario.

Individuazione dei colli di bottiglia

Usare il riquadro Metriche nel processo di Analisi di flusso di Azure per identificare i colli di bottiglia nella pipeline. Vedere Eventi di input/output per la velocità effettiva e "Ritardo limite" o Eventi con backlog per verificare se il processo è in grado di mantenere la frequenza di input. Per le metriche di Hub eventi, cercare Richieste limitate e regolare le unità di soglia di conseguenza. Per le metriche di Azure Cosmos DB, vedere Max consumed UR/s per ogni intervallo di chiavi di partizione in Velocità effettiva per assicurarsi che gli intervalli di chiavi di partizione vengano usati in modo uniforme. Per il database SQL di Azure, monitorare I/O LOG e CPU.

Come ottenere assistenza

Per maggiore supporto, provare la Pagina delle domande di Domande e risposte Microsoft per Analisi di flusso di Azure.

Passaggi successivi