Condividi tramite


Usare la parallelizzazione delle query in Analisi di flusso di Azure

Questo articolo illustra come sfruttare i vantaggi della parallelizzazione in Analisi di flusso di Azure. Si apprenderà come ridimensionare i processi di Analisi di flusso configurando partizioni di input e ottimizzando la definizione di query.

Come prerequisito, potrebbe essere necessario avere familiarità con la nozione di unità di streaming descritta in Comprendere e regolare le unità di streaming.

Quali sono le parti di un processo di Analisi di flusso?

Una definizione di un processo di analisi dei flussi include almeno un input di flusso, una query e un output di flusso. Gli input sono i punti da cui l'attività legge lo stream di dati. La query viene utilizzata per trasformare il flusso di dati in ingresso, e l'output è la destinazione a cui il processo invia i risultati.

Partizioni negli ingressi e nelle uscite

Il partizionamento consente di suddividere i dati in subset in base a una chiave di partizione. Se l'input ,ad esempio Hub eventi, è partizionato da una chiave, è consigliabile specificare la chiave di partizione quando si aggiunge un input al processo di Analisi di flusso. Il ridimensionamento di un processo di Analisi di flusso sfrutta i vantaggi offerti dall'uso di partizioni nell'input e nell'output. Un processo di Analisi di flusso può usare diverse partizioni e scrivervi in parallelo, aumentando così la velocità effettiva.

Input

Tutti gli input di streaming di Analisi di flusso di Azure possono sfruttare il partizionamento: Event Hub, Hub IoT, Archiviazione Blob, Data Lake Storage Gen2.

Nota

Per il livello di compatibilità 1.2 e versioni successive, la chiave di partizione deve essere impostata come proprietà di input, senza la necessità della parola chiave PARTITION BY nella query. Per il livello di compatibilità 1.1 e versioni successive, la chiave di partizione deve invece essere definita con la parola chiave PARTITION BY nella query.

Output

Quando si usa Analisi di flusso di Azure, è possibile sfruttare il partizionamento negli output:

  • Azure Data Lake Storage
  • Funzioni di Azure
  • Tabella di Azure
  • Archiviazione BLOB (è necessario impostare la chiave di partizione in modo esplicito)
  • Azure Cosmos DB (è necessario impostare la chiave di partizione in modo esplicito)
  • Hub eventi (è necessario impostare la chiave di partizione in modo esplicito)
  • Hub IoT (è necessario impostare la chiave di partizione in modo esplicito)
  • Bus di servizio
  • SQL e Azure Synapse Analytics con partizionamento facoltativo: per ulteriori informazioni, consultare Output verso il database SQL di Azure.

Power BI non supporta il partizionamento. Tuttavia, è comunque possibile partizionare l'input come descritto in questa sezione.

Per altre informazioni sulle partizioni, vedere gli articoli seguenti:

Interrogazione

Affinché un processo sia parallelo, le chiavi di partizione devono essere allineate tra tutti gli input, tutti i passaggi della logica di query e tutti gli output. Il partizionamento della logica di query è determinato dalle chiavi usate per join e aggregazioni (GROUP BY). L'ultimo requisito può essere ignorato se la logica della query non è indicizzata (proiezione, filtri, join di riferimento...).

  • Se un input e un output sono partizionati da WarehouseId, e la query raggruppa per ProductId senza WarehouseId, il processo non è parallelo.
  • Se due input da unire sono partizionati da chiavi di partizione diverse (WarehouseId e ProductId), il processo non è parallelo.
  • Se due o più flussi di dati indipendenti sono contenuti in un singolo processo, ognuno con la propria chiave di partizione, il processo non è parallelo.

Solo quando tutti gli input, gli output e i passaggi di query usano la stessa chiave, il processo è parallelo.

Processi intrinsecamente paralleli

Un processo imbarazzantemente parallelo è lo scenario più scalabile in Analisi di flusso di Azure. Collega una partizione dell'input a un'istanza della query e a una partizione dell'output. Questo parallelismo presenta i requisiti seguenti:

  • Se la logica di query richiede che la stessa chiave venga elaborata dalla stessa istanza di query, è necessario verificare che gli eventi siano diretti alla stessa partizione dell'input. Per Event Hubs o IoT Hub, significa che i dati evento devono avere il PartitionKey impostato. In alternativa, è possibile usare mittenti partizionati. Per l'archiviazione BLOB, il che significa che gli eventi vengono inviati alla stessa cartella di partizione. Un esempio è costituito da un'istanza di query che aggrega i dati per userID, dove l'hub eventi di input viene partizionato usando userID come chiave di partizione. Tuttavia, se la logica di query non richiede che la stessa chiave venga elaborata dalla stessa istanza di query, è possibile ignorare questo requisito. Un esempio di questa logica è offerto da una query semplice select-project-filter.

  • Il passaggio successivo consiste nel partizionare la query. Per i processi con livello di compatibilità 1.2 o superiore (scelta consigliata), la colonna personalizzata può essere specificata come chiave di partizione nelle impostazioni di input e il processo verrà eseguito automaticamente in parallelo. Per i processi con livello di compatibilità 1.0 o 1.1, è necessario usare PARTITION BY PartitionId in tutti i passaggi della query. È possibile eseguire più passaggi, ma tutti devono essere partizionati con la stessa chiave.

  • Per la maggior parte degli output supportati in Analisi di flusso di Azure, è possibile sfruttare il partizionamento. Se si usa un tipo di output che non supporta il partizionamento, il processo non risulterà perfettamente parallelo. Per gli output di Hub eventi, verificare che la Colonna chiave di partizione sia impostata sulla stessa chiave di partizione utilizzata nella query. Per ulteriori informazioni, vedere la sezione di output.

  • Il numero delle partizioni di input deve essere uguale a quello delle partizioni di output. Lo output dell'archiviazione di Blob può supportare le partizioni ed eredita lo schema di partizionamento della query a monte. Quando viene specificata una chiave di partizione per l'archiviazione BLOB, i dati vengono partizionati per partizione di input, pertanto il risultato è ancora completamente parallelo. Ecco alcuni esempi di valori di partizioni che consentono un processo perfettamente parallelo:

    • Otto partizioni di input dell'hub eventi e otto partizioni di output dell'hub eventi
    • Otto partizioni di input dell'hub eventi e output di archiviazione BLOB
    • Otto partizioni di input dell'hub eventi e partizioni di output dell'archiviazione blob basate su un campo personalizzato con cardinalità arbitraria
    • Otto partizioni di input dell'archiviazione BLOB e output dell'archiviazione BLOB
    • Otto partizioni di input dell'archiviazione BLOB e otto partizioni di output dell'Event Hub

Le sezioni seguenti illustrano alcuni esempi di scenari perfettamente paralleli.

Query semplice

  • Input: un hub eventi con otto partizioni
  • Output: Un hub eventi con otto partizioni ("Colonna chiave di partizione" deve essere impostata per l'uso PartitionId)

Query:

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

Questa query è un filtro semplice. Non è pertanto necessario preoccuparsi del partizionamento dell'input inviato all'hub eventi. Si noti che i processi con livello di compatibilità precedente a 1.2 devono includere la clausola PARTITION BY PartitionId, in modo da soddisfare il requisito 2 descritto in precedenza. A livello di output, è necessario configurare l'output dell'hub eventi nel processo in modo che la chiave di partizione sia impostata su PartitionId. È infine necessario verificare che il numero delle partizioni di input sia uguale a quello delle partizioni di output.

Query con chiave di raggruppamento

  • Input: Hub eventi con otto partizioni
  • Output: Archiviazione Blob

Query:

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Questa query include una chiave di raggruppamento. Di conseguenza, gli eventi raggruppati devono essere inviati alla stessa partizione di Hub eventi. Poiché in questo esempio viene eseguito il raggruppamento in base a TollBoothID, è necessario assicurarsi che TollBoothID venga usato come chiave di partizione quando gli eventi vengono inviati a Hub eventi. In Analisi di flusso di Azure è quindi possibile usare PARTITION BY PartitionId per ereditare da questo schema di partizione e abilitare la parallelizzazione completa. Poiché l'output è costituito dall'archiviazione BLOB, non occorre preoccuparsi di configurare un valore di chiave di partizione, come definito dal requisito 4.

Esempio di scenari che non sono* imbarazzantemente paralleli

Nella sezione precedente l'articolo ha illustrato alcuni scenari imbarazzanti paralleli. In questa sezione vengono illustrati gli scenari che non soddisfano tutti i requisiti da considerarsi intrinsecamente paralleli.

Numero di partizioni non corrispondente

  • Input: un hub eventi con otto partizioni
  • Output: un hub eventi con 32 partizioni

Se il numero delle partizioni di input non corrisponde a quello delle partizioni di output, la topologia non è perfettamente parallela, indipendentemente dalla query. Tuttavia, è comunque possibile ottenere un certo livello di parallelizzazione.

Query utilizzando output non partizionati

  • Input: un hub eventi con otto partizioni
  • Output: Power BI

L'output Power BI attualmente non supporta il partizionamento. Pertanto, questo scenario non è imbarazzantemente parallelo.

Query a più passaggi con valori diversi per PARTITION BY

  • Input: Hub eventi con otto partizioni
  • Output: Hub eventi con otto partizioni
  • Livello di compatibilità: 1.0 o 1.1

Query:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Come è possibile osservare, il secondo passaggio usa TollBoothId come chiave di partizionamento. Questo passaggio non è lo stesso del primo passaggio e pertanto è necessario eseguire un rimescolamento.

Query a più passaggi con valori diversi per PARTITION BY

  • Input: Hub eventi con otto partizioni ("Colonna chiave di partizione" non impostata, valore predefinito su "PartitionId")
  • Output: hub eventi con otto partizioni ("La colonna chiave di partizione" deve essere impostata per usare "TollBoothId")
  • Livello di compatibilità - 1.2 o versione successiva

Query:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Per impostazione predefinita, il livello di compatibilità 1.2 o versione successiva consente l'esecuzione di query in parallelo. Ad esempio, la query della sezione precedente verrà partizionata a condizione che la colonna "TollBoothId" sia impostata come chiave di partizione di input. La clausola PARTITION BY PartitionId non è obbligatoria.

Calcolare il numero massimo di unità di streaming di un task

Il numero totale di unità di streaming che possono essere usate da un processo di Analisi dei flussi dipende dal numero di passaggi nella query definita per il processo e dal numero di partizioni per ogni passaggio.

Passaggi in una query

Una query può includere uno o più passaggi. Ogni passaggio è una sottoquery definita mediante la parola chiave WITH. Anche la query esterna alla parola chiave WITH (una sola query) viene contata come passaggio. Ad esempio, l'istruzione SELECT nella query seguente:

Query:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

Questa query ha due passaggi.

Nota

Questa query viene illustrata in dettaglio più avanti nell'articolo.

Partizionamento di un passaggio

Il partizionamento di un passaggio richiede le condizioni seguenti:

  • L'origine di input deve essere partizionata.
  • L'istruzione SELECT della query deve leggere da un'origine di input partizionata.
  • La query all'interno della fase deve includere la parola chiave PARTITION BY.

Quando una query è partizionata, gli eventi di input vengono elaborati e aggregati in gruppi separati di partizioni e per ogni gruppo vengono generati eventi di output. Se si desidera un'aggregazione combinata, è necessario creare un secondo passaggio non partizionato per effettuare l'aggregazione.

Calcolare il massimo di unità di streaming per un'attività

Tutti i passaggi non partizionati insieme possono aumentare fino a un'unità di streaming (SU V2s) per un processo di Analisi di flusso. È anche possibile aggiungere un SU V2 per ogni partizione in un passaggio partizionato. È possibile visualizzare alcuni esempi nella tabella seguente.

Interrogazione Numero massimo di SU per il job
  • La query contiene un unico passaggio.
  • Il passaggio non è partizionato.
1 SU V2
  • Il flusso di dati di input è suddiviso in 16 partizioni.
  • La query contiene un unico passaggio.
  • La fase è partizionata.
16 SU V2 (1 * 16 partizioni)
  • La query contiene due passaggi.
  • Nessuno dei passaggi è partizionato.
1 SU V2
  • Il flusso di dati di input è suddiviso in 3 partizioni.
  • La query contiene due passaggi. Il passaggio di input è partizionato, mentre il secondo passaggio non lo è.
  • L'istruzione SELECT legge dall'input partizionato.
4 SU V2s (3 per i passaggi partizionati + 1 per i passaggi non partizionati

Esempi di ridimensionamento

La query seguente calcola il numero di automobili che passano per una stazione di pedaggio con tre caselli in un intervallo di tre minuti. Questa query può essere ridimensionata fino a un SU V2.

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Per utilizzare un numero maggiore di unità di streaming (SUs) per la query, è necessario che sia il flusso di dati di input che la query siano partizionati. Poiché la partizione del flusso di dati è impostata su 3, la query modificata seguente può essere ridimensionata fino a 3 SU V2:

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Quando una query è partizionata, gli eventi di input vengono elaborati e aggregati in gruppi di partizioni separati Vengono generati anche eventi di output per ciascuno dei gruppi. Il partizionamento può causare risultati imprevisti quando il campo GROUP BY non è la chiave di partizione nel flusso di dati di input. Ad esempio, il campo TollBoothId nella query precedente non è la chiave di partizione di Input1. I dati provenienti dal casello 1 possono pertanto essere distribuiti in più partizioni.

Le singole partizioni di Input1 verranno elaborate separatamente da Analisi di flusso. Verranno pertanto creati più record del conteggio relativo al passaggio di automobili dallo stesso casello nella stessa finestra a cascata. Se la chiave di partizione di input non può essere modificata, questo problema può essere risolto aggiungendo un passaggio non partizionato per aggregare i valori tra le partizioni, come nell'esempio seguente:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Questa query può essere scalata a 4 SU V2.

Nota

Se si uniscono due flussi, verificare che tali flussi siano partizionati in base alla chiave di partizione della colonna usata per le unioni. Controllare inoltre che in entrambi i flussi sia presente lo stesso numero di partizioni.

Ottenimento di una velocità effettiva più elevata su larga scala

Anche se necessario, un processo trivialmente parallelo non è sufficiente per mantenere un throughput più elevato su larga scala. Ogni sistema di archiviazione e il corrispondente output di Analisi di flusso presenta variazioni su come ottenere la migliore velocità effettiva di scrittura possibile. Come per qualsiasi scenario su larga scala, è possibile risolvere alcuni problemi usando le configurazioni corrette. Questa sezione illustra le configurazioni per alcuni output comuni e fornisce esempi per mantenere i tassi di inserimento di 1.000, 5.000 e 10.000 eventi al secondo.

Le osservazioni seguenti usano un processo di Analisi di flusso con query pass-through senza stato, una funzione definita dall'utente JavaScript di base che scrive in Hub eventi, Azure SQL o Azure Cosmos DB.

Hub eventi

Velocità di inserimento (eventi al secondo) Unità di streaming Risorse di output
1 K 1/3 2 TU
5.000 1 6 TU
10 K 2 10 TU

La soluzione Event Hubs si ridimensiona in modo lineare in termini di unità di streaming (SU) e di throughput, rendendola il metodo più efficiente e performante per analizzare e trasmettere i dati da Stream Analytics. I job possono essere ridimensionati fino a 66 SU V2, che corrisponde approssimativamente all'elaborazione fino a 400 MB/s o a 38 trilioni di eventi al giorno.

Azure SQL

Velocità di inserimento (eventi al secondo) Unità di streaming Risorse di output
1 K 2/3 S3
5.000 3 P4
10 K 6 P6

Azure SQL supporta la scrittura in parallelo, denominata " Eredita schema di partizione", ma non è abilitata per impostazione predefinita. Tuttavia, l'abilitazione dell'ereditarietà del partizionamento, insieme a una query completamente parallela, potrebbe non essere sufficiente per ottenere velocità effettiva più elevate. Le velocità effettiva di scrittura SQL dipendono in modo significativo dalla configurazione del database e dallo schema della tabella. L'articolo relativo alle prestazioni output di SQL offre maggiori dettagli sui parametri che possono ottimizzare la velocità effettiva di scrittura. Come indicato nell'articolo Output di Analisi di flusso di Azure per database SQL di Azure, questa soluzione non può scalare linearmente come una pipeline completamente parallelizzata oltre le 8 partizioni e potrebbe essere necessario un repartizionamento prima di generare l'output SQL (consultare INTO). Gli SKU Premium sono necessari per sostenere velocità di I/O elevate, oltre al sovraccarico dovuto ai backup del log effettuati ogni pochi minuti.

Azure Cosmos DB

Velocità di inserimento (eventi al secondo) Unità di streaming Risorse di output
1 K 2/3 20 K RU
5.000 4 60 K RU
10 K 8 120 K UR

L'output di Azure Cosmos DB nell'analisi di flusso è stato aggiornato per utilizzare l'integrazione nativa sotto il livello di compatibilità 1.2. Il livello di compatibilità 1.2 consente una velocità effettiva significativamente superiore e riduce il consumo di UR rispetto al livello 1.1, che rappresenta il livello di compatibilità predefinito per i nuovi processi. La soluzione usa i contenitori di Azure Cosmos DB partizionati in /deviceId e il resto della soluzione è configurato in modo identico.

Tutti gli esempi di Streaming at Scale di Azure utilizzano Event Hubs come input, alimentato da client di test che simulano il carico. Ogni evento di input è un documento JSON di 1 KB, che converte facilmente le velocità di inserimento configurate in velocità effettiva (1 MB/s, 5 MB/s e 10 MB/s). Gli eventi simulano un dispositivo IoT che invia i dati JSON seguenti (in formato abbreviato) per un massimo di 1.000 dispositivi:

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Nota

Le configurazioni sono soggette a modifiche a causa dei vari componenti usati nella soluzione. Per una stima più accurata, personalizzare gli esempi per adattarli allo scenario.

Individuazione dei colli di bottiglia

Usare il riquadro Metriche nel processo di Analisi di flusso di Azure per identificare i colli di bottiglia nella pipeline. Vedere Eventi di input/output per la velocità effettiva e "Ritardo limite" o Eventi con backlog per verificare se il processo è in grado di mantenere la frequenza di input. Per le metriche di Event Hub, cercare Richieste con limitazione e regolare le unità di soglia di conseguenza. Per le metriche di Azure Cosmos DB, vedere Max consumed RU/s per intervallo di chiave di partizione in Throughput per assicurarsi che gli intervalli di chiavi di partizione vengano usati in modo uniforme. Per il database SQL di Azure, monitorare I/O LOG e CPU.

Come ottenere assistenza

Per maggiore supporto, provare la Pagina delle domande di Domande e risposte Microsoft per Analisi di flusso di Azure.

Passaggi successivi