Shuffleabfrage

Die shuffle Abfrage ist eine semantisch erhaltende Transformation, die mit einer Reihe von Operatoren verwendet wird, die die shuffle Strategie unterstützen. Abhängig von den beteiligten Daten kann die Abfrage mit der shuffle Strategie zu einer besseren Leistung führen. Es ist besser, die Shuffle-Abfragestrategie zu verwenden, wenn der shuffle Schlüssel (schlüssel join , summarize schlüssel, make-series schlüssel oder partition schlüssel) eine hohe Kardinalität aufweist und die reguläre Operatorabfrage Abfrage die Abfragelimits erreicht.

Sie können die folgenden Operatoren mit dem Befehl shuffle verwenden:

Um die shuffle Abfragestrategie zu verwenden, fügen Sie den Ausdruck hint.strategy = shuffle oder hinzu hint.shufflekey = <key>. Wenn Sie verwenden hint.strategy=shuffle, werden die Operatordaten durch alle Schlüssel gemischt. Verwenden Sie diesen Ausdruck, wenn der zusammengesetzte Schlüssel eindeutig ist, aber nicht jeder Schlüssel eindeutig genug ist, sodass Sie die Daten mit allen Schlüsseln des shuffled-Operators mischen.

Beim Partitionieren von Daten mit der Shufflestrategie wird die Datenlast auf allen Clusterknoten gemeinsam genutzt. Jeder Knoten verarbeitet eine Partition der Daten. Die Standardanzahl von Partitionen entspricht der Anzahl der Clusterknoten.

Die Partitionsnummer kann mit der Syntax hint.num_partitions = total_partitionsüberschrieben werden, die die Anzahl der Partitionen steuert. Dies ist nützlich, wenn der Cluster über eine kleine Anzahl von Clusterknoten verfügt und die Standardpartitionen klein sind und die Abfrage fehlschlägt oder eine lange Ausführungszeit in Anspruch nimmt.

Hinweis

Die Verwendung vieler Partitionen kann mehr Clusterressourcen beanspruchen und die Leistung beeinträchtigen. Wählen Sie die Partitionsnummer sorgfältig aus, indem Sie mit beginnen hint.strategy = shuffle , und beginnen Sie mit dem schrittweisen Erhöhen der Partitionen.

In einigen Fällen wird ignoriert hint.strategy = shuffle , und die Abfrage wird nicht in der shuffle Strategie ausgeführt. Dies kann passieren, wenn:

  • Der join Operator verfügt über einen anderen shuffle-kompatiblen Operator (join, summarizeoder make-seriespartition) auf der linken oder rechten Seite.
  • Der summarize Operator wird nach einem anderen shuffle-kompatiblen Operator (join, summarizeoder make-seriespartition) in der Abfrage angezeigt.

Syntax

Mit hint.strategy = shuffle

T|DataExpression|joinhint.strategy = shuffle(DataExpression)

T|summarizehint.strategy = shuffleDataExpression

T|Abfrage|Partitionsunterabfragehint.strategy = shuffle()

Mit hint.shufflekey = Schlüssel

T|DataExpression|joinhint.shufflekey = Schlüssel(DataExpression)

T|summarizehint.shufflekey = KeyDataExpression

T|make-serieshint.shufflekey = KeyDataExpression

T|Abfrage|Partitionsschlüssel-Unterabfrage(hint.shufflekey = )

Erfahren Sie mehr über Syntaxkonventionen.

Parameter

Name Typ Erforderlich Beschreibung
T string ✔️ Die tabellarische Quelle, deren Daten vom Operator verarbeitet werden sollen.
DataExpression string Ein impliziter oder expliziter tabellarischer Transformationsausdruck.
Abfrage string Ein Transformationsausdruck, der für die Datensätze von T ausgeführt wird.
key string Verwenden Sie einen Schlüssel, summarize einen Schlüssel, make-series einen Schlüssel oder partition einen join Schlüssel.
Unterabfrage string Ein Transformationsausdruck.

Hinweis

Abhängig von der gewählten Syntax muss entweder DataExpression oder Query angegeben werden.

Beispiele

Verwenden von summarize mit Shuffle

Die shuffle Strategieabfrage mit summarize dem Operator teilt sich die Last auf allen Clusterknoten, wobei jeder Knoten eine Partition der Daten verarbeitet.

StormEvents
| summarize hint.strategy = shuffle count(), avg(InjuriesIndirect) by State
| count 

Ausgabe

Anzahl
67

Verwenden von Join mit Shuffle

StormEvents
| where State has "West"
| where EventType has "Flood"
| join hint.strategy=shuffle 
    (
    StormEvents
    | where EventType has "Hail"
    | project EpisodeId, State, DamageProperty
    )
    on State
| count

Ausgabe

Anzahl
103

Verwenden von Make-Series mit Shuffle

StormEvents
| where State has "North"
| make-series hint.shufflekey = State sum(DamageProperty) default = 0 on StartTime in range(datetime(2007-01-01 00:00:00.0000000), datetime(2007-01-31 23:59:00.0000000), 15d) by State

Ausgabe

State sum_DamageProperty StartTime
NORTH DAKOTA [60000,0,0] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:100:00.0000000Z","2007-01-30T00:00:00.0000000Z"]
NORTH CAROLINA [20000,0,1000] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:100:00.0000000Z","2007-01-30T00:00:00.0000000Z"]
ATLANTIK, NORDEN [0,0,0] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:100:00.0000000Z","2007-01-30T00:00:00.0000000Z"]

Verwenden der Partition mit Shuffle

StormEvents
| partition hint.strategy=shuffle by EpisodeId
(
    top 3 by DamageProperty
    | project EpisodeId, State, DamageProperty
)
| count

Ausgabe

Anzahl
22345

Vergleich von hint.strategy=shuffle und hint.shufflekey=key

Wenn Sie verwenden hint.strategy=shuffle, wird der shuffled-Operator durch alle Tasten gemischt. Im folgenden Beispiel mischt die Abfrage die Daten mit sowohl EventId als auch EpisodeId als Schlüssel:

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
| count

Ausgabe

Anzahl
14

Die folgende Abfrage verwendet hint.shufflekey = key. Die obige Abfrage entspricht dieser Abfrage.

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.shufflekey = EpisodeId hint.shufflekey = EventId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

Ausgabe

Anzahl
14

Mischen der Daten mit mehreren Schlüsseln

In einigen Fällen wird ignoriert hint.strategy=shuffle , und die Abfrage wird nicht in der Shuffle-Strategie ausgeführt. Im folgenden Beispiel hat die Verknüpfung beispielsweise eine Zusammenfassung auf der linken Seite, sodass die Verwendung der hint.strategy=shuffle Shuffle-Strategie nicht auf die Abfrage angewendet wird:

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

Ausgabe

EpisodeId EventId ... EpisodeId1 EventId1 ...
1030 4407 ... 1030 4407 ...
1030 13721 ... 1030 13721 ...
2477 12530 ... 2477 12530 ...
2103 10237 ... 2103 10237 ...
2103 10239 ... 2103 10239 ...
... ... ... ... ... ...

Um dieses Problem zu beheben und in einer Shufflestrategie auszuführen, wählen Sie den Schlüssel aus, der für die summarize Vorgänge und join üblich ist. In diesem Fall ist EpisodeIddieser Schlüssel . Verwenden Sie den Hinweishint.shufflekey, um die Shuffle-Taste für für join anzugeben:hint.shufflekey = EpisodeId

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.shufflekey=EpisodeId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

Ausgabe

EpisodeId EventId ... EpisodeId1 EventId1 ...
1030 4407 ... 1030 4407 ...
1030 13721 ... 1030 13721 ...
2477 12530 ... 2477 12530 ...
2103 10237 ... 2103 10237 ...
2103 10239 ... 2103 10239 ...
... ... ... ... ... ...

Verwenden von summarize mit Shuffle zur Verbesserung der Leistung

In diesem Beispiel verbessert die Verwendung des Operators mit shuffle der summarize Strategie die Leistung. Die Quelltabelle enthält 150 Millionen Datensätze, und die Kardinalität der Gruppe nach Schlüssel beträgt 10 Millionen, die auf 10 Clusterknoten verteilt ist.

Wenn Operator summarize ohne shuffle Strategie verwendet wird, endet die Abfrage nach 1:08, und die Speicherauslastungsspitze beträgt ~3 GB:

orders
| summarize arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count

Ausgabe

Anzahl
1086

Bei Verwendung der shuffle Strategie mit summarizeendet die Abfrage nach ca. 7 Sekunden, und der Spitzenwert der Speicherauslastung beträgt 0,43 GB:

orders
| summarize hint.strategy = shuffle arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count

Ausgabe

Anzahl
1086

Im folgenden Beispiel wird die Leistung in einem Cluster mit zwei Clusterknoten mit einer Tabelle mit 60 Millionen Datensätzen veranschaulicht, wobei die Kardinalität der Gruppe nach Schlüssel 2 Mio. beträgt.

Das Ausführen der Abfrage ohne hint.num_partitions verwendet nur zwei Partitionen (als Clusterknotennummer), und die folgende Abfrage dauert ca. 1:10 Minuten:

lineitem 
| summarize hint.strategy = shuffle dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume

Wenn Sie die Partitionsnummer auf 10 festlegen, endet die Abfrage nach 23 Sekunden:

lineitem 
| summarize hint.strategy = shuffle hint.num_partitions = 10 dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume

Verwenden von Join mit Shuffle zur Verbesserung der Leistung

Das folgende Beispiel zeigt, wie die Verwendung von shuffle Strategie mit dem Operator die join Leistung verbessert.

Die Beispiele wurden in einem Cluster mit 10 Knoten entnommen, in dem die Daten auf alle diese Knoten verteilt sind.

Die linke Quelltabelle der Abfrage enthält 15 Millionen Datensätze, wobei die Kardinalität des join Schlüssels ~14 Mio. beträgt. Die rechte Quelle der Abfrage verfügt über 150 Millionen Datensätze, und die Kardinalität des join Schlüssels beträgt 10 Millionen. Die Abfrage endet nach ca. 28 Sekunden, und der Spitzenwert der Arbeitsspeicherauslastung beträgt 1,43 GB:

customer
| join
    orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey

Wenn Sie die Strategie mit einem join Operator verwendenshuffle, endet die Abfrage nach ca. 4 Sekunden, und der Spitzenwert der Speicherauslastung beträgt 0,3 GB:

customer
| join
    hint.strategy = shuffle orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey

In einem anderen Beispiel versuchen wir die gleichen Abfragen für ein größeres Dataset mit den folgenden Bedingungen:

  • Die linke Quelle des join ist 150M, und die Kardinalität des Schlüssels beträgt 148M.
  • Die rechte Quelle des join ist 1,5 B, und die Kardinalität des Schlüssels beträgt ~100M.

Die Abfrage mit nur dem join Operator erreicht grenzwerte und timeout nach 4 Minuten. Wenn Sie jedoch die Strategie mit dem join Operator verwendenshuffle, endet die Abfrage nach ca. 34 Sekunden, und die Speicherauslastungsspitze beträgt 1,23 GB.

Das folgende Beispiel zeigt die Verbesserung für einen Cluster mit zwei Clusterknoten mit einer Tabelle mit 60 Millionen Datensätzen, wobei die Kardinalität des join Schlüssels 2M beträgt. Das Ausführen der Abfrage ohne hint.num_partitions verwendet nur zwei Partitionen (als Clusterknotennummer), und die folgende Abfrage dauert ca. 1:10 Minuten:

lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey   part
on $left.l_partkey == $right.p_partkey
| consume

Wenn Sie die Partitionsnummer auf 10 festlegen, endet die Abfrage nach 23 Sekunden:

lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey  hint.num_partitions = 10    part
on $left.l_partkey == $right.p_partkey
| consume