query shuffle

La shuffle query è una trasformazione di conservazione semantica usata con un set di operatori che supportano la shuffle strategia. A seconda dei dati coinvolti, l'esecuzione di query con la shuffle strategia può produrre prestazioni migliori. È preferibile usare la strategia di query shuffle quando la chiave (una chiave, summarize una chiave, make-series una chiave o partition una join chiave) ha una cardinalità elevata e la shuffle query dell'operatore regolare raggiunge i limiti di query.

È possibile usare gli operatori seguenti con il comando shuffle:

Per usare la strategia di query, aggiungere l'espressione shufflehint.strategy = shuffle o hint.shufflekey = <key>. Quando si usa hint.strategy=shuffle, i dati dell'operatore verranno rimischiati da tutte le chiavi. Usare questa espressione quando la chiave composta è univoca, ma ogni chiave non è sufficientemente univoca, quindi si rischierà i dati usando tutte le chiavi dell'operatore casuale.

Quando si partiziona i dati con la strategia di shuffle, il carico dei dati viene condiviso in tutti i nodi del cluster. Ogni nodo elabora una partizione dei dati. Il numero predefinito di partizioni è uguale al numero di nodi del cluster.

Il numero di partizione può essere sottoposto a override usando la sintassi hint.num_partitions = total_partitions, che controlla il numero di partizioni. Ciò è utile quando il cluster ha un numero ridotto di nodi del cluster e il numero di partizioni predefinite sarà ridotto e la query ha esito negativo o richiede molto tempo di esecuzione.

Nota

L'uso di molte partizioni può utilizzare più risorse del cluster e ridurre le prestazioni. Scegliere attentamente il numero di partizione iniziando con l'avvio hint.strategy = shuffle e aumentando gradualmente le partizioni.

In alcuni casi, l'oggetto hint.strategy = shuffle viene ignorato e la query non verrà eseguita nella shuffle strategia. Questa situazione può verificarsi quando:

  • L'operatore join ha un altro shuffleoperatore compatibile (join, o partitionsummarizemake-series ) sul lato sinistro o sul lato destro.
  • L'operatore summarize viene visualizzato dopo un altro shuffleoperatore compatibile (join, summarizeo make-seriespartition) nella query.

Sintassi

Con hint.strategy = shuffle

T|DataExpression|joinhint.strategy = shuffle(DataExpression)

T|summarizehint.strategy = shuffleDataExpression

T|Query|SubQuery della partizione hint.strategy = shuffle()

Con hint.shufflekey = chiave

T|DataExpression|joinhint.shufflekey = Chiave(DataExpression)

T|summarizehint.shufflekey = keyDataExpression

T|make-serieshint.shufflekey = keyDataExpression

T|Query|SubQuerydella chiave( di partizione hint.shufflekey = )

Altre informazioni sulle convenzioni di sintassi.

Parametri

Nome Tipo Obbligatoria Descrizione
T string ✔️ Origine tabulare i cui dati devono essere elaborati dall'operatore.
DataExpression string Espressione di trasformazione tabulare implicita o esplicita.
Query string Espressione di trasformazione eseguita sui record di T.
key string Usare una chiave, summarize una chiave, make-series una chiave o partition una join chiave.
Sottoquery string Espressione di trasformazione.

Nota

DataExpression o Query devono essere specificati a seconda della sintassi scelta.

Esempio

Usare riepilogo con shuffle

La shuffle query di strategia con summarize l'operatore condivide il carico su tutti i nodi del cluster, in cui ogni nodo elabora una partizione dei dati.

StormEvents
| summarize hint.strategy = shuffle count(), avg(InjuriesIndirect) by State
| count 

Output

Conteggio
67

Usare join con shuffle

StormEvents
| where State has "West"
| where EventType has "Flood"
| join hint.strategy=shuffle 
    (
    StormEvents
    | where EventType has "Hail"
    | project EpisodeId, State, DamageProperty
    )
    on State
| count

Output

Conteggio
103

Usare la serie make-series con shuffle

StormEvents
| where State has "North"
| make-series hint.shufflekey = State sum(DamageProperty) default = 0 on StartTime in range(datetime(2007-01-01 00:00:00.0000000), datetime(2007-01-31 23:59:00.0000000), 15d) by State

Output

State sum_DamageProperty StartTime
NORTH DAKOTA [60000,0,0] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.000000Z"]
CAROLINA DEL NORD [20000,0,1000] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.000000Z"]
ATLANTICO NORD [0,0,0] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.000000Z"]

Usare la partizione con shuffle

StormEvents
| partition hint.strategy=shuffle by EpisodeId
(
    top 3 by DamageProperty
    | project EpisodeId, State, DamageProperty
)
| count

Output

Conteggio
22345

Confrontare hint.strategy=shuffle e hint.shufflekey=key

Quando si usa hint.strategy=shuffle, l'operatore shuffled verrà sviato da tutte le chiavi. Nell'esempio seguente la query esegue lo shuffing dei dati usando sia EpisodeIdEventId che come chiavi:

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
| count

Output

Conteggio
14

Nella query seguente viene utilizzata la parola chiave hint.shufflekey = key. La query precedente equivale a questa query.

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.shufflekey = EpisodeId hint.shufflekey = EventId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

Output

Conteggio
14

Shuffle the data with multiple keys

In alcuni casi, l'oggetto hint.strategy=shuffle verrà ignorato e la query non verrà eseguita in una strategia casuale. Nell'esempio seguente, ad esempio, il join presenta un riepilogo sul lato sinistro, quindi l'uso hint.strategy=shuffle di non applicherà la strategia casuale alla query:

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

Output

EpisodeId EventId ... EpisodeId1 EventId1 ...
1030 4407 ... 1030 4407 ...
1030 13721 ... 1030 13721 ...
2477 12530 ... 2477 12530 ...
2103 10237 ... 2103 10237 ...
2103 10239 ... 2103 10239 ...
... ... ... ... ... ...

Per risolvere questo problema ed eseguire una strategia casuale, scegliere la chiave comune per le summarize operazioni e join . In questo caso, questa chiave è EpisodeId. Usare l'hint hint.shufflekey per specificare la chiave casuale in join su :hint.shufflekey = EpisodeId

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.shufflekey=EpisodeId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

Output

EpisodeId EventId ... EpisodeId1 EventId1 ...
1030 4407 ... 1030 4407 ...
1030 13721 ... 1030 13721 ...
2477 12530 ... 2477 12530 ...
2103 10237 ... 2103 10237 ...
2103 10239 ... 2103 10239 ...
... ... ... ... ... ...

Usare summarize con shuffle per migliorare le prestazioni

In questo esempio, l'uso dell'operatore summarize con shuffle la strategia migliora le prestazioni. La tabella di origine ha 150M record e la cardinalità del gruppo per chiave è 10M, che viene distribuita su 10 nodi del cluster.

L'uso dell'operatore summarize senza shuffle strategia, la query termina dopo le 1:08 e il picco di utilizzo della memoria è di circa 3 GB:

orders
| summarize arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count

Output

Conteggio
1086

Quando si usa shuffle la strategia con summarize, la query termina dopo circa 7 secondi e il picco di utilizzo della memoria è di 0,43 GB:

orders
| summarize hint.strategy = shuffle arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count

Output

Conteggio
1086

L'esempio seguente illustra le prestazioni in un cluster con due nodi del cluster, con una tabella con record 60M, in cui la cardinalità del gruppo per chiave è 2M.

L'esecuzione della query senza hint.num_partitions userà solo due partizioni (come numero di nodi del cluster) e la query seguente richiederà circa 1:10 minuti:

lineitem 
| summarize hint.strategy = shuffle dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume

Se si imposta il numero di partizioni su 10, la query terminerà dopo 23 secondi:

lineitem 
| summarize hint.strategy = shuffle hint.num_partitions = 10 dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume

Usare join con shuffle per migliorare le prestazioni

Nell'esempio seguente viene illustrato come l'uso shuffle della strategia con l'operatore join migliora le prestazioni.

Gli esempi sono stati campionati in un cluster con 10 nodi in cui i dati vengono distribuiti in tutti questi nodi.

La tabella di origine sul lato sinistro della query contiene 15M record in cui la cardinalità della join chiave è ~14M. L'origine sul lato destro della query ha 150M record e la cardinalità della join chiave è 10M. La query termina dopo circa 28 secondi e il picco di utilizzo della memoria è di 1,43 GB:

customer
| join
    orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey

Quando si usa shuffle la strategia con un join operatore, la query termina dopo circa 4 secondi e il picco di utilizzo della memoria è di 0,3 GB:

customer
| join
    hint.strategy = shuffle orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey

In un altro esempio si provano le stesse query su un set di dati più grande con le condizioni seguenti:

  • L'origine join sul lato sinistro di è 150M e la cardinalità della chiave è 148M.
  • L'origine join sul lato destro di è 1,5B e la cardinalità della chiave è ~100M.

La query con solo l'operatore join raggiunge i limiti e il timeout dopo 4 minuti. Tuttavia, quando si usa shuffle la strategia con l'operatore join , la query termina dopo circa 34 secondi e il picco di utilizzo della memoria è di 1,23 GB.

L'esempio seguente illustra il miglioramento in un cluster con due nodi del cluster, con una tabella di 60M record, in cui la cardinalità della join chiave è 2M. L'esecuzione della query senza hint.num_partitions userà solo due partizioni (come numero di nodi del cluster) e la query seguente richiederà circa 1:10 minuti:

lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey   part
on $left.l_partkey == $right.p_partkey
| consume

Quando si imposta il numero di partizioni su 10, la query terminerà dopo 23 secondi:

lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey  hint.num_partitions = 10    part
on $left.l_partkey == $right.p_partkey
| consume