Ausführung von adaptiven Abfragen
Die adaptive Abfrageausführung (Adaptive Query Execution, AQE) ist eine erneute Optimierung von Abfragen, die während der Abfrageausführung erfolgt.
Der Beweggrund für die erneute Optimierung der Laufzeit ist, dass Azure Databricks am Ende eines Shuffle- und Broadcastaustauschs (in AQE als Abfragephase bezeichnet) über die aktuellsten genauen Statistiken verfügt. Daher kann Azure Databricks eine bessere physische Strategie wählen, eine optimale Partitionsgröße und -anzahl nach dem Shuffle auswählen oder Optimierungen durchführen, für die früher Hinweise erforderlich waren, z. B. die Behandlung von Skew-Joins.
Dies kann sehr nützlich sein, wenn die Statistiksammlung nicht aktiviert ist oder Statistiken veraltet sind. Dies ist auch dann nützlich, wenn statisch abgeleitete Statistiken ungenau sind, z. B. in der Mitte einer komplizierten Abfrage oder nach dem Auftreten von Datenschiefe.
Funktionen
AQE ist standardmäßig aktiviert. AQE umfasst vier Hauptfunktionen:
- Dynamische Änderung von Sort-Merge-Join in Broadcast-Hash-Join.
- Dynamische Zusammenführung von Partitionen (Kombinieren kleiner Partitionen zu Partitionen mit angemessener Größe) nach dem Shuffle-Austausch. Sehr kleine Aufgaben haben einen schlechteren E/A-Durchsatz und unterliegen tendenziell einem höheren Planungsaufwand und einem Mehraufwand bei der Aufgabeneinrichtung. Durch die Kombination kleiner Aufgaben können Sie Ressourcen sparen und den Clusterdurchsatz verbessern.
- Dynamische Behandlung von Datenschiefe bei Sort-Merge-Join und Shuffle-Hash-Join durch Aufteilung (und ggf. Replikation) von Aufgaben mit Schiefe in ungefähr gleichgroße Aufgaben.
- Dynamische Erkennung und Weitergabe von leeren Beziehungen.
Anwendung
AQE wird auf alle Abfragen angewendet, für die Folgendes gilt:
- Nicht-Streaming
- Sie enthalten mindestens einen Austausch (in der Regel, wenn ein Join, ein Aggregat oder ein Fenster vorhanden ist), eine Unterabfrage oder beides.
Nicht alle Abfragen, auf die AQE angewendet wird, werden zwangsläufig erneut optimiert. Bei der erneuten Optimierung kann sich ein anderer als der statisch kompilierte Abfrageplan ergeben. Informationen dazu, wie Sie feststellen können, ob der Plan einer Abfrage von AQE geändert wurde, finden Sie im folgenden Abschnitt: Abfragepläne.
Abfragepläne
In diesem Abschnitt wird erläutert, wie Sie Abfragepläne auf unterschiedliche Weise untersuchen können.
Inhalt dieses Abschnitts:
Spark-Benutzeroberfläche
AdaptiveSparkPlan
-Knoten
Abfragen, auf die AQE angewendet wird, enthalten mindestens einen AdaptiveSparkPlan
-Knoten, in der Regel als Stammknoten jeder Hauptabfrage oder Unterabfrage.
Vor oder während der Ausführung der Abfrage wird das isFinalPlan
-Flag des entsprechenden AdaptiveSparkPlan
-Knotens als false
angezeigt. Nach Abschluss der Abfrageausführung ändert sich das isFinalPlan
-Flag in true.
.
Der sich entwickelnde Plan
Das Abfrageplandiagramm wird im Laufe der Ausführung weiterentwickelt und spiegelt den aktuellen Plan wider, der ausgeführt wird. Bereits ausgeführte Knoten (für die Metriken verfügbar sind) werden nicht geändert, aber noch nicht ausgeführte Knoten können sich im Laufe der Zeit aufgrund von erneuten Optimierungen ändern.
Im Folgenden finden Sie ein Beispiel für ein Abfrageplandiagramm:
DataFrame.explain()
AdaptiveSparkPlan
-Knoten
Abfragen, auf die AQE angewendet wird, enthalten mindestens einen AdaptiveSparkPlan
-Knoten, in der Regel als Stammknoten jeder Hauptabfrage oder Unterabfrage. Vor oder während der Ausführung der Abfrage wird das isFinalPlan
-Flag des entsprechenden AdaptiveSparkPlan
-Knotens als false
angezeigt. Nach Abschluss der Abfrageausführung ändert sich das isFinalPlan
-Flag in true
.
Aktueller Plan und Anfangsplan
Unter jedem AdaptiveSparkPlan
-Knoten befinden sich sowohl der Anfangsplan (der Plan vor dem Anwenden von AQE-Optimierungen) als auch der aktuelle oder der endgültige Plan, je nachdem, ob die Ausführung abgeschlossen ist. Der aktuelle Plan wird im Laufe der Ausführung weiterentwickelt.
Laufzeitstatistiken
Jede Shuffle- und Broadcastphase enthält Datenstatistiken.
Vor oder während der Ausführung der Phase sind die Statistiken Schätzungen zur Kompilierzeit, und das Flag isRuntime
ist auf false
festgelegt. Beispiel: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
Nach Abschluss der Phasenausführung werden die Statistiken zur Laufzeit erfasst, und das Flag isRuntime
wird auf true
festgelegt. Beispiel: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
Nachfolgend finden Sie ein Beispiel für DataFrame.explain
:
Vor der Ausführung
Während der Ausführung
Nach der Ausführung
SQL EXPLAIN
AdaptiveSparkPlan
-Knoten
Abfragen, auf die AQE angewendet wird, enthalten mindestens einen AdaptiveSparkPlan-Knoten, in der Regel als Stammknoten jeder Hauptabfrage oder Unterabfrage.
Kein aktueller Plan
Da SQL EXPLAIN
die Abfrage nicht ausführt, entspricht der aktuelle Plan immer dem Anfangsplan und spiegelt nicht wider, was letztendlich von AQE ausgeführt würde.
Nachfolgend finden Sie ein Beispiel für SQL EXPLAIN:
Wirksamkeit
Der Abfrageplan wird geändert, wenn mindestens eine AQE-Optimierung wirksam wird. Die Auswirkung dieser AQE-Optimierungen wird durch den Unterschied zwischen dem aktuellen und dem endgültigen Plan und dem Anfangsplan und bestimmten Planknoten im aktuellen und endgültigen Plan veranschaulicht.
Dynamische Änderung von Sort-Merge-Join in Broadcast-Hash-Join: unterschiedliche physische Join-Knoten zwischen dem aktuellen/endgültigen Plan und dem Anfangsplan
Dynamische Zusammenführung von Partitionen: Knoten
CustomShuffleReader
mit EigenschaftCoalesced
Dynamische Behandlung von Skew-Joins: Knoten
SortMergeJoin
mit FeldisSkew
als „true“.Dynamische Erkennung und Weitergabe von leeren Beziehungen: Ein Teil des Plans (oder der gesamte Plan) wird durch den Knoten „LocalTableScan“ ersetzt, wobei das Beziehungsfeld leer ist.
Konfiguration
Inhalt dieses Abschnitts:
- Aktivieren und Deaktivieren der adaptiven Abfrageausführung
- Aktivieren des automatisch optimierten Mischens
- Dynamische Änderung von Sort-Merge-Join in Broadcast-Hash-Join
- Dynamische Zusammenführung von Partitionen
- Dynamische Behandlung von Skew-Joins
- Dynamische Erkennung und Weitergabe von leeren Beziehungen
Aktivieren und Deaktivieren der adaptiven Abfrageausführung
Eigenschaft |
---|
spark.databricks.optimizer.adaptive.enabled Geben Sie Folgendes ein: Boolean Gibt an, ob die adaptive Abfrageausführung aktiviert oder deaktiviert werden soll. Standardwert: true |
Aktivieren des automatisch optimierten Mischens
Eigenschaft |
---|
spark.sql.shuffle.partitions Geben Sie Folgendes ein: Integer Die Standardanzahl von Partitionen, die beim Mischen von Daten für Verknüpfungen oder Aggregationen verwendet werden sollen. Durch Festlegen des Werts auto wird automatisch optimiertes Mischen aktiviert, was diese Zahl basierend auf dem Abfrageplan und der Größe der Abfrageeingabedaten automatisch bestimmt.Hinweis: Für strukturiertes Streaming kann diese Konfiguration nicht zwischen Abfrageneustarts vom gleichen Prüfpunktstandort geändert werden. Standardwert: 200 |
Dynamische Änderung von Sort-Merge-Join in Broadcast-Hash-Join
Eigenschaft |
---|
spark.databricks.adaptive.autoBroadcastJoinThreshold Geben Sie Folgendes ein: Byte String Der Schwellenwert, der den Wechsel zum Broadcast-Join zur Laufzeit auslöst. Standardwert: 30MB |
Dynamische Zusammenführung von Partitionen
Eigenschaft |
---|
spark.sql.adaptive.coalescePartitions.enabled Geben Sie Folgendes ein: Boolean Gibt an, ob die Zusammenführung von Partitionen aktiviert oder deaktiviert werden soll. Standardwert: true |
spark.sql.adaptive.advisoryPartitionSizeInBytes Geben Sie Folgendes ein: Byte String Die Zielgröße nach der Zusammenführung. Die Größe der zusammengeführten Partitionen entspricht in etwa dieser Zielgröße, liegt aber nicht darüber. Standardwert: 64MB |
spark.sql.adaptive.coalescePartitions.minPartitionSize Geben Sie Folgendes ein: Byte String Die Mindestgröße der Partitionen nach der Zusammenführung. Die Größe der zusammengeführten Partitionen liegt nicht unter dieser Größe. Standardwert: 1MB |
spark.sql.adaptive.coalescePartitions.minPartitionNum Geben Sie Folgendes ein: Integer Die Mindestanzahl von Partitionen nach der Zusammenführung. Nicht empfohlen, weil durch das Festlegen dieser Eigenschaft die folgende Eigenschaft explizit außer Kraft gesetzt wird: spark.sql.adaptive.coalescePartitions.minPartitionSize .Standardwert: 2-fache Anzahl von Clusterkernen |
Dynamische Behandlung von Skew-Joins
Eigenschaft |
---|
spark.sql.adaptive.skewJoin.enabled Geben Sie Folgendes ein: Boolean Gibt an, ob die Behandlung von Skew-Joins aktiviert oder deaktiviert werden soll. Standardwert: true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor Geben Sie Folgendes ein: Integer Ein Faktor, der bei Multiplizierung mit der mittleren Partitionsgröße dazu beiträgt, zu bestimmen, ob bei einer Partition Datenschiefe vorliegt. Standardwert: 5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes Geben Sie Folgendes ein: Byte String Ein Schwellenwert, der dazu beiträgt, zu bestimmen, ob bei einer Partition Datenschiefe vorliegt. Standardwert: 256MB |
Eine Partition gilt als ungleichmäßig, wenn sowohl (partition size > skewedPartitionFactor * median partition size)
als auch (partition size > skewedPartitionThresholdInBytes)
den Wert true
aufweisen.
Dynamische Erkennung und Weitergabe von leeren Beziehungen
Eigenschaft |
---|
spark.databricks.adaptive.emptyRelationPropagation.enabled Geben Sie Folgendes ein: Boolean Gibt an, ob die dynamische Weitergabe von leeren Beziehungen aktiviert oder deaktiviert werden soll. Standardwert: true |
Häufig gestellte Fragen (FAQ)
Inhalt dieses Abschnitts:
- Warum hat AQE eine kleine Join-Tabelle nicht übertragen?
- Sollte ich bei aktivierter AQE trotzdem einen Hinweis auf eine Broadcast-Join-Strategie verwenden?
- Was ist der Unterschied zwischen einem Skew-Join-Hinweis und der AQE-Skew-Join-Optimierung? Was sollte ich verwenden?
- Warum hat AQE meine Join-Reihenfolge nicht automatisch angepasst?
- Warum hat AQE die Datenschiefe nicht erkannt?
Warum hat AQE eine kleine Join-Tabelle nicht übertragen?
Beachten Sie die folgenden Punkte, wenn die Größe der Beziehung, die übertragen werden soll, unter diesem Schwellenwert liegt, aber trotzdem nicht übertragen wird:
- Überprüfen Sie den Join-Typ. Broadcast wird für bestimmte Join-Typen nicht unterstützt, z. B. kann die linke Beziehung eines
LEFT OUTER JOIN
nicht übertragen werden. - Es kann auch sein, dass die Beziehung viele leere Partitionen enthält. In diesem Fall können die meisten Aufgaben mit einem Sort-Merge-Join schnell abgeschlossen werden, oder es kann möglicherweise eine Optimierung mit der Behandlung von Skew-Joins vorgenommen werden. AQE vermeidet die Änderung solcher Sort-Merge-Joins in Broadcast-Hash-Joins, wenn der Prozentsatz der nicht leeren Partitionen niedriger ist als
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
.
Sollte ich bei aktivierter AQE trotzdem einen Hinweis auf eine Broadcast-Join-Strategie verwenden?
Ja. Ein statisch geplanter Broadcast-Join ist in der Regel leistungsfähiger als ein von AQE dynamisch geplanter, da AQE möglicherweise erst nach dem Ausführen von Shuffle für beide Seiten des Joins zum Broadcast-Join wechselt (bis zu diesem Zeitpunkt werden die tatsächlichen Beziehungsgrößen ermittelt). Daher kann die Verwendung eines Broadcast-Hinweises eine gute Wahl darstellen, wenn Sie Ihre Abfrage gut kennen. AQE beachtet Abfragehinweise auf die gleiche Weise wie die statische Optimierung, kann aber auch dynamische Optimierungen anwenden, die von den Hinweisen nicht beeinflusst werden.
Was ist der Unterschied zwischen einem Skew-Join-Hinweis und der AQE-Skew-Join-Optimierung? Was sollte ich verwenden?
Es wird empfohlen, sich auf die AQE-Behandlung von Skew-Joins zu verlassen, statt den Skew-Join-Hinweis zu verwenden, da die AQE-Behandlung von Skew-Joins vollständig automatisch erfolgt und im Allgemeinen eine bessere Leistung bietet als das Gegenstück des Hinweises.
Warum hat AQE meine Join-Reihenfolge nicht automatisch angepasst?
Die dynamische Neuanordnung von Joins ist kein Bestandteil von AQE.
Warum hat AQE die Datenschiefe nicht erkannt?
Es gibt zwei Größenbedingungen, die erfüllt sein müssen, damit AQE erkennt, dass bei einer Partition Datenschiefe vorliegt:
- Die Partitionsgröße liegt über dem
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
(Standardwert: 256 MB). - Die Partitionsgröße liegt über der mittleren Größe aller Partitionen mal dem Faktor für Partitionen mit Datenschiefe
spark.sql.adaptive.skewJoin.skewedPartitionFactor
(Standardwert: 5).
Darüber hinaus ist die Unterstützung für die Behandlung von Datenschiefe bei bestimmten Join-Typen eingeschränkt, z. B. kann bei LEFT OUTER JOIN
nur die Schiefe auf der linken Seite optimiert werden.
Vorversion
Der Begriff „Adaptive Ausführung“ wird seit Spark 1.6 verwendet, aber die neue AQE in Spark 3.0 ist grundlegend anders. In Bezug auf die Funktionalität übernimmt Spark 1.6 nur den Teil „dynamische Zusammenführung von Partitionen“. In Bezug auf die technische Architektur ist die neue AQE ein Framework der dynamischen Planung und Neuplanung von Abfragen auf der Grundlage von Laufzeitstatistiken, das eine Vielzahl von Optimierungen (wie z. B. die in diesem Artikel beschriebenen) unterstützt und erweitert werden kann, um weitere potenzielle Optimierungen zu ermöglichen.