Freigeben über


Optimieren der zustandsbehafteten Verarbeitung in Delta Live Tables mit Grenzwerten

Um die im Zustand gespeicherten Daten effektiv zu verwalten, verwenden Sie beim Ausführen der zustandsbehafteten Datenverarbeitung in Delta Live Tables Grenzwerte, einschließlich Aggregationen, Verknüpfungen und der Aufhebung der Duplizierung. In diesem Artikel wird beschrieben, wie Sie Grenzwerte in Ihren Delta Live Tables-Abfragen verwenden. Zudem enthält er Beispiele für die empfohlenen Vorgänge.

Hinweis

Um zu gewährleisten, dass Abfragen, die Aggregationen ausführen, inkrementell verarbeitet und nicht bei jedem Update vollständig neu berechnet werden, müssen Sie Grenzwerte verwenden.

Was ist ein Wasserzeichen?

Bei der Verarbeitung von Datenströmen ist ein Grenzwert ein Apache Spark-Feature, das beim Ausführen von zustandsbehafteten Vorgängen wie Aggregationen einen zeitbasierten Schwellenwert für die Verarbeitung von Daten definieren kann. Die eingehenden Daten werden verarbeitet, bis der Schwellenwert erreicht ist. Zu diesem Zeitpunkt wird das durch den Schwellenwert definierte Zeitfenster geschlossen. Grenzwerte können verwendet werden, um Probleme während der Abfrageverarbeitung zu vermeiden. Das geschieht hauptsächlich bei der Verarbeitung größerer Datasets oder bei Verarbeitungen mit langer Ausführungsdauer. Diese Probleme können eine hohe Wartezeit bei der Erstellung von Ergebnissen und sogar Fehler durch nicht genügend Arbeitsspeicher aufgrund der Datenmenge umfassen, die während der Verarbeitung im Zustand gespeichert sind. Da Streamingdaten inhärent ungeordnet sind, unterstützen Grenzwerte ebenfalls die ordnungsgemäße Berechnung von Vorgängen wie Aggregationen von Zeitfenstern.

Weitere Informationen zur Verwendung von Grenzwerten in der Verarbeitung von Datenströmen finden Sie unter Grenzwerte in Apache Spark Structured Streaming und Anwenden von Grenzwerten zum Steuern von Schwellenwerten für die Datenverarbeitung.

Wie werden Grenzwerte definiert?

Ein Grenzwert wird durch das Angeben eines Felds für den Zeitstempel und eines Werts definiert, der den Schwellenwert für den Zeitraum angibt, in dem verspätete Daten eingehen dürfen. Daten werden als verspätet erachtet, wenn sie nach dem definierten Schwellenwert für den Zeitraum eingehen. Beispiel: Wenn der Schwellenwert auf zehn Minuten festgelegt wird, werden Datensätze, die nach diesem Zeitraum eingehen, möglicherweise verworfen.

Da Datensätze, die nach dem definierten Schwellenwert eingehen, möglicherweise verworfen werden, ist es wichtig, einen Schwellenwert auszuwählen, der Ihre Anforderungen an die Wartezeit im Vergleich zur Fehlerfreiheit erfüllt. Die Auswahl eines geringeren Schwellenwerts führt dazu, dass Datensätze früher ausgegeben werden. Das bedeutet jedoch auch, dass verspätete Datensätze mit einer höheren Wahrscheinlichkeit verworfen werden. Durch einen höheren Schwellenwert entsteht eine längere Wartezeit. Das kann allerdings zu einer höheren Fehlerfreiheit der Daten führen. Aufgrund der größeren Zustandsgröße kann ein höherer Schwellenwert außerdem zusätzliche Computerressourcen erfordern. Da der Schwellenwert von Ihren Anforderungen an Daten und die Verarbeitung abhängt, ist es wichtig, die Verarbeitung auf Ihrer Seite zu testen und zu überwachen, um einen optimalen Schwellenwert zu ermitteln.

In Python verwenden Sie die withWatermark()-Funktion, um einen Grenzwert zu definieren. Verwenden Sie in SQL die WATERMARK-Klausel, um einen Grenzwert zu definieren:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Verwenden von Grenzwerten mit Datenstromverknüpfungen

Bei Datenstromverknüpfungen müssen Sie einen Grenzwert auf beiden Seiten der Verknüpfung und der Klausel für einen Zeitintervall definieren. Da jede Verknüpfungsquelle eine unvollständige Ansicht der Daten enthält, ist die Klausel für den Zeitintervall erforderlich, um der Streamingengine mitzuteilen, wann weitere Abgleiche nicht möglich sind. Die Klausel für den Zeitintervall muss dieselben Felder verwenden, die zum Definieren der Grenzwerte verwendet werden.

Da für jeden Datenstrom gelegentlich unterschiedliche Schwellenwerte für Grenzwerte erforderlich sein können, müssen die Datenströme nicht dieselben Schwellenwerte aufweisen. Um fehlende Daten zu vermeiden, verwaltet die Streamingengine einen globalen Grenzwert basierend auf dem langsamsten Datenstrom.

Im folgenden Beispiel wird ein Datenstrom von Anzeigenansichten und ein Datenstrom von benutzerseitigen Anzeigenklicks verknüpft. In diesem Beispiel muss ein Klick innerhalb von drei Minuten nach der Ansicht erfolgen. Nach Ablauf des dreiminütigen Zeitintervalls werden Zeilen aus dem Zustand gelöscht, die nicht mehr abgeglichen werden können.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Durchführen von Aggregationen im Fenstermodus mit Grenzwerten

Ein häufiger zustandsbehafteter Vorgang bei Streamingdaten ist die Aggregation im Fenstermodus. Aggregationen im Fenstermodus ähneln gruppierten Aggregationen. Sie unterscheiden sich jedoch insofern, als Aggregatwerte für die Menge der Zeilen zurückgegeben werden, die Teil des definierten Fensters sind.

Ein Fenster kann als bestimmte Länge definiert werden, und ein Aggregationsvorgang kann für alle Zeilen ausgeführt werden, die Teil dieses Fensters sind. Spark Streaming unterstützt drei Fenstertypen:

  • Rollierende (feste) Fenster: Eine Reihe nicht überlappender und zusammenhängender Zeitintervalle mit einer festen Größe. Ein Eingabedatensatz gehört lediglich zu einem einzelnen Fenster.
  • Gleitende Fenster: Ähnlich wie rollierende Fenster verfügen gleitende Fenster über eine feste Größe. Die Fenster können jedoch überlappen, und ein Datensatz kann in mehreren Fenstern auftreten.

Wenn Daten nach dem Ende des Fensters zuzüglich der Länge des Grenzwerts eingehen, werden keine neuen Daten für das Fenster akzeptiert, das Ergebnis der Aggregation wird ausgegeben, und der Zustand für das Fenster wird gelöscht.

Im folgenden Beispiel wird alle fünf Minuten mithilfe eines festen Fensters die Summe der Ansichten berechnet. In diesem Beispiel verwendet die Auswahlklausel den Alias impressions_window. Anschließend wird das Fenster selbst als Teil der GROUP BY-Klausel definiert. Das Fenster muss auf derselben Zeitstempelspalte wie der Grenzwert basieren – in diesem Beispiel auf der clickTimestamp-Spalte.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Ein ähnliches Python-Beispiel, in dem der Gewinn stündlich über feste Fenster berechnet wird:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    dlt.read_stream("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Aufheben des Duplizierens von Streamingdatensätzen

Das strukturierte Streaming bietet Exactly-Once-Verarbeitungsgarantien, hebt jedoch die Duplizierung von Datensätzen aus Datenquellen nicht automatisch auf. Beispiel: Da viele Nachrichtenwarteschlangen At-Least-Once-Garantien aufweisen, ist beim Lesen aus einer dieser Nachrichtenwarteschlangen mit doppelten Datensätzen zu rechnen. Sie können die dropDuplicatesWithinWatermark()-Funktion verwenden, um die Duplizierung der Datensätze für jedes angegebene Feld aufzuheben und Duplikate aus einem Datenstrom zu entfernen, selbst wenn sich einige Felder wie die der Ereignis- oder Eingangszeit unterscheiden. Sie müssen einen Grenzwert angeben, der die dropDuplicatesWithinWatermark()-Funktion verwenden soll. Alle doppelten Daten, die innerhalb des durch den Grenzwert angegebenen Zeitraums eingehen, werden gelöscht.

Sortierte Daten sind wichtig, da unsortierte Daten dazu führen, dass der Grenzwert nicht richtig fortgeführt wird. Wenn daraufhin ältere Daten eingehen, werden diese als verspätet erachtet und verworfen. Verwenden Sie die withEventTimeOrder-Option, um die Anfangsmomentaufnahme in der Reihenfolge basierend auf dem im Grenzwert angegebenen Zeitstempel zu verarbeiten. Die withEventTimeOrder-Option kann im Code, der das Dataset definiert, oder mithilfe von spark.databricks.delta.withEventTimeOrder.enabled in den Pipelineeinstellungen deklariert werden. Beispiel:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Hinweis

Die withEventTimeOrder-Option wird ausschließlich mit Python unterstützt.

Im folgenden Beispiel werden Daten verarbeitet, die nach clickTimestamp sortiert sind. Außerdem werden Datensätze gelöscht, die im Abstand von fünf Sekunden eingehen und doppelte Spalten für userId und clickAdId enthalten.

clicksDedupDf = (
  spark.readStream
    .option("withEventTimeOrder", "true")
    .table(rawClicks)
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Optimieren der Pipelinekonfiguration für zustandsbehaftete Verarbeitung

Um Produktionsprobleme und übermäßige Wartezeiten zu vermeiden, empfiehlt Databricks die Aktivierung der RocksDB-basierten Zustandsverwaltung für Ihre zustandsbehaftete Datenstromverarbeitung. Diese Empfehlung gilt insbesondere dann, wenn Ihre Verarbeitung eine große Menge an Zwischenzuständen erfordert.

Serverlose Pipelines verwalten automatisch Zustandsspeicherkonfigurationen.

Sie können die RocksDB-basierte Zustandsverwaltung aktivieren, indem Sie die folgende Konfiguration festlegen, bevor Sie eine Pipeline bereitstellen:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Weitere Informationen zum RocksDB-Zustandsspeicher, einschließlich Konfigurationsempfehlungen für RocksDB, finden Sie unter Konfigurieren des RocksDB-Zustandsspeichers in Azure Databricks.