Überwachen von Delta Live Tables-Pipelines

Dieser Artikel beschreibt, wie Sie integrierte Features in Delta Live Tables für die Überwachung und den Einblick für Pipelines verwenden können, einschließlich Datenherkunft, Updateverlauf und Datenqualitätsberichterstattung.

Sie können die meisten Überwachungsdaten manuell über die Benutzeroberfläche für Pipelinedetails überprüfen. Einige Aufgaben sind einfacher zu erledigen, indem die Ereignisprotokoll-Metadaten abfragt werden. Weitere Informationen finden Sie unter Was ist das Delta Live Tables-Ereignisprotokoll?.

Welche Pipelinedetails sind auf der Benutzeroberfläche verfügbar?

Das Pipelinediagramm wird angezeigt, sobald ein Update für eine Pipeline erfolgreich gestartet wurde. Pfeile stellen Abhängigkeiten zwischen Datasets in Ihrer Pipeline dar. Standardmäßig zeigt die Seite mit den Pipelinedetails das neueste Update für die Tabelle an, aber Sie können ältere Updates aus einem Dropdownmenü auswählen.

Zu den angezeigten Details gehören die Pipeline-ID, Quellbibliotheken, Computekosten, die Produktedition und der für die Pipeline konfigurierte Kanal.

Um eine tabellarische Ansicht von Datasets anzuzeigen, klicken Sie auf die Registerkarte Liste. Mit der Ansicht Liste können Sie alle Datasets in Ihrer Pipeline anzeigen, die als Zeile in einer Tabelle dargestellt werden. Dies ist nützlich, wenn der Pipeline-DAG zu groß ist, um in der Graph-Ansicht angezeigt zu werden. Sie können die Anzeige der Datasets in der Tabelle mithilfe mehrerer Filter steuern, z. B. Name, Typ und Status des Datasets. Um zur DAG-Visualisierung zurückzukehren, klicken Sie auf Graph.

Der Ausführen als-Benutzer ist der Pipelinebesitzer, und Pipelineupdates werden mit den Berechtigungen dieses Benutzers ausgeführt. Um den run as-Benutzer zu ändern, klicken Sie auf Berechtigungen, und ändern Sie den Pipelinebesitzer.

Wie können Datasetdetails angezeigt werden?

Wenn Sie im Pipelinediagramm oder in der Dataset-Liste auf ein Dataset klicken, werden Details zum Dataset angezeigt. Details umfassen das Datasetschema, Datenqualitätsmetriken und einen Link zurück zum Quellcode, der das Dataset definiert.

Prüfen des Updateverlaufs

Klicken Sie auf das Dropdownmenü „Updateverlauf“, um den Verlauf und den Status von Pipelineupdates anzuzeigen.

Wählen Sie das Update im Dropdownmenü aus, um den Graph, die Details und die Ereignisse für ein Update anzuzeigen. Klicken Sie auf Aktuelles Update anzeigen, um zum neuesten Update zurückzukehren.

Abrufen von Benachrichtigungen für Pipelineereignisse

Um Echtzeitbenachrichtigungen für Pipelineereignisse zu erhalten, z. B. erfolgreicher Abschluss eines Pipelineupdates oder Fehler eines Pipelineupdates, fügen Sie beim Erstellen oder Bearbeiten einer Pipeline E-Mail-Benachrichtigungen für Pipelineereignisse hinzufügen hinzu.

Was ist das Delta Live Tables-Ereignisprotokoll?

Das Ereignisprotokoll für Delta Live Tables enthält alle Informationen im Zusammenhang mit einer Pipeline, einschließlich Überwachungsprotokollen, Datenqualitätsprüfungen, Pipelinefortschritt und Datenherkunft. Sie können das Ereignisprotokoll verwenden, um den Status Ihrer Datenpipelines nachzuverfolgen, zu verstehen und zu überwachen.

Sie können Ereignisprotokolleinträge auf der Delta Live Tables-Benutzeroberfläche, über die Delta Live Tables-API oder direkt durch Abfragen des Ereignisprotokolls anzeigen. Dieser Artikel konzentriert sich auf das direkte Abfragen des Ereignisprotokolls.

Sie können auch benutzerdefinierte Aktionen definieren, die ausgeführt werden sollen, wenn Ereignisse mit Ereignishooks protokolliert werden, z. B. das Senden von Warnungen.

Ereignisprotokollschema

In folgender Tabelle wird das Ereignisprotokollschema beschrieben. Einige dieser Felder enthalten JSON-Daten, die geparst werden müssen, um einige Abfragen durchzuführen, wie z. B. das Feld details. Azure Databricks unterstützt den :-Operator zum Parsen von JSON-Feldern. Siehe : (Doppelpunkt)-Operator.

Feld Beschreibung
id Ein eindeutiger Bezeichner für den Ereignisprotokolldatensatz.
sequence Dies ist ein JSON-Dokument, das Metadaten zum Identifizieren und Anordnen von Ereignissen enthält.
origin Ein JSON-Dokument, das Metadaten für den Ursprung des Ereignisses enthält, z. B. den Cloudanbieter, die Cloudanbieterregion, user_id, pipeline_id oder pipeline_type, um anzuzeigen, wo die Pipeline erstellt wurde, entweder DBSQL oder WORKSPACE.
timestamp Dies ist der Zeitpunkt, zu dem das Ereignis aufgezeichnet wurde.
message Dies ist eine Meldung für Benutzer*innen, die das Ereignis beschreibt.
level Der Ereignistyp, z. B. INFO, WARN, ERROR oder METRICS.
error Wenn ein Fehler aufgetreten ist, werden Details zur Beschreibung des Fehlers angezeigt.
details Dies ist ein JSON-Dokument, das strukturierte Details des Ereignisses enthält. Dies ist das primäre Feld, das zum Analysieren von Ereignissen verwendet wird.
event_type Der Ereignistyp.
maturity_level Die Stabilität des Ereignisschemas. Die folgenden Werte sind möglich:

* STABLE: Das Schema ist stabil und ändert sich nicht.
* NULL: Das Schema ist stabil und ändert sich nicht. Der Wert lautet möglicherweise NULL, wenn der Datensatz erstellt wurde, bevor das Feld maturity_level hinzugefügt wurde (Release 2022.37).
* EVOLVING: Das Schema ist nicht stabil und ändert sich möglicherweise.
* DEPRECATED: Das Schema ist veraltet, und die Delta Live Tables-Runtime kann die Erstellung dieses Ereignisses jederzeit beenden.

Abfragen des Ereignisprotokolls

Der Speicherort des Ereignisprotokolls und die Schnittstelle zum Abfragen des Ereignisprotokolls hängen davon ab, ob Ihre Pipeline für die Verwendung des Hive-Metastore oder Unity Catalog konfiguriert ist.

Hive-Metastore

Wenn Ihre Pipeline Tabellen im Hive-Metastore veröffentlicht, wird das Ereignisprotokoll in /system/events unter dem Speicherort storage gespeichert. Wenn Sie ihre storage-Pipelineeinstellung beispielsweise als /Users/username/data konfiguriert haben, wird das Ereignisprotokoll im Pfad /Users/username/data/system/events im DBFS gespeichert.

Wenn Sie die Einstellung storage nicht konfiguriert haben, ist /pipelines/<pipeline-id>/system/events der Standardspeicherort für Ereignisprotokolle im DBFS. Wenn die ID Ihrer Pipeline beispielsweise 91de5e48-35ed-11ec-8d3d-0242ac130003 lautet, ist /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events der Speicherort.

Sie können eine Ansicht erstellen, um das Abfragen des Ereignisprotokolls zu vereinfachen. Das folgende Beispiel erstellt eine temporäre Ansicht namens event_log_raw. Diese Ansicht wird in den Beispielereignisprotokollabfragen in diesem Artikel verwendet:

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

Ersetzen Sie <event-log-path> durch den Ereignisprotokollspeicherort.

Jede Instanz einer Pipelineausführung wird als Update bezeichnet. Sie möchten häufig Informationen für das neueste Update extrahieren. Führen Sie die folgende Abfrage aus, um den Bezeichner für das neueste Update zu suchen und in der temporären latest_update_id-Ansicht zu speichern. Diese Ansicht wird in den Beispielereignisprotokollabfragen in diesem Artikel verwendet:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Sie können das Ereignisprotokoll in einem Azure Databricks-Notebook oder im SQL-Editor abfragen. Verwenden Sie ein Notebook oder den SQL-Editor, um die Beispielereignisprotokollabfragen auszuführen.

Unity Catalog

Wenn Ihre Pipeline Tabellen in Unity Catalog veröffentlicht, müssen Sie die event_logTabellenwertfunktion (Table Valued Function, TVF) verwenden, um das Ereignisprotokoll für die Pipeline abzurufen. Sie rufen das Ereignisprotokoll für eine Pipeline ab, indem Sie die Pipeline-ID oder einen Tabellennamen an die TVF übergeben. So rufen Sie beispielsweise die Ereignisprotokolldatenschätze für die Pipeline mit der ID 04c78631-3dd7-4856-b2a6-7d84e9b2638b ab:

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

So rufen Sie die Ereignisprotokolldatensätze für die Pipeline ab, welche die Tabelle my_catalog.my_schema.table1 erstellt oder besitzt:

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

Um die TVF aufzurufen, müssen Sie einen freigegebenen Cluster oder ein SQL-Warehouse verwenden. Sie können beispielsweise ein Notebook verwenden, das an einen freigegebenen Cluster angefügt ist, oder den SQL-Editor verwenden, der mit einem SQL-Warehouse verbunden ist.

Um das Abfragen von Ereignissen für eine Pipeline zu vereinfachen, kann der Besitzer der Pipeline eine Ansicht über die event_log-TVF erstellen. Das folgende Beispiel erstellt eine Ansicht über dem Ereignisprotokoll für eine Pipeline. Diese Ansicht wird in den Beispielereignisprotokollabfragen in diesem Artikel verwendet.

Hinweis

Die event_log-TVF kann nur vom Pipelinebesitzer aufgerufen werden, und eine über die event_log-TVF erstellte Ansicht kann nur vom Pipelinebesitzer abgefragt werden. Die Ansicht kann nicht für andere Benutzer freigegeben werden.

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

Ersetzen Sie <pipeline-ID> durch den eindeutigen Bezeichner für die Delta Live Tables-Pipeline. Sie finden die ID im Bereich Pipelinedetails auf der Benutzeroberfläche von Delta Live Tables.

Jede Instanz einer Pipelineausführung wird als Update bezeichnet. Sie möchten häufig Informationen für das neueste Update extrahieren. Führen Sie die folgende Abfrage aus, um den Bezeichner für das neueste Update zu suchen und in der temporären latest_update_id-Ansicht zu speichern. Diese Ansicht wird in den Beispielereignisprotokollabfragen in diesem Artikel verwendet:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Abfragen von Herkunftsinformationen aus dem Ereignisprotokoll

Ereignisse, die Informationen zur Datenherkunft enthalten, weisen den Ereignistyp flow_definition auf. Das details:flow_definition-Objekt enthält das output_dataset und das input_datasets, welche jede Beziehung im Diagramm definieren.

Sie können die folgende Abfrage verwenden, um die Eingabe- und Ausgabedatasets zu extrahieren, um Herkunftsinformationen anzuzeigen:

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
1 customers null
2 sales_orders_raw null
3 sales_orders_cleaned ["customers", "sales_orders_raw"]
4 sales_order_in_la ["sales_orders_cleaned"]

Abfragen der Datenqualität aus dem Ereignisprotokoll

Wenn Sie Erwartungen für Datasets in Ihrer Pipeline definieren, werden die Datenqualitätsmetriken im details:flow_progress.data_quality.expectations-Objekt gespeichert. Ereignisse, die Informationen zur Datenqualität enthalten, weisen den Ereignistyp flow_progress auf. Im folgenden Beispiel werden die Datenqualitätsmetriken für das letzte Pipelineupdate abgefragt:

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
1 sales_orders_cleaned valid_order_number 4083 0

Überwachen des Datenbacklogs durch Abfragen des Ereignisprotokolls

Delta Live Tables verfolgt, wie viele Daten im Backlog im details:flow_progress.metrics.backlog_bytes-Objekt vorhanden sind. Ereignisse, die Backlogmetriken enthalten, haben den Ereignistyp flow_progress. Im folgenden Beispiel werden die Backlog-Metriken für das letzte Pipelineupdate abgefragt:

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

Hinweis

Die Backlogmetriken sind je nach Datenquellentyp und Databricks-Runtime-Version der Pipeline möglicherweise nicht verfügbar.

Überwachen von Ereignissen mit erweiterter automatischer Skalierung aus dem Ereignisprotokoll

Das Ereignisprotokoll erfasst die Größe des Clusters, wenn die erweiterte automatische Skalierung in Ihren Pipelines aktiviert ist. Ereignisse, die Informationen zur Datenqualität enthalten, weisen den Ereignistyp autoscale auf. Die Größe der Anforderungsinformationen des Clusters wird im details:autoscale-Objekt gespeichert. Im folgenden Beispiel wird die Größenänderungsanforderungen des erweiterten Clusters für das letzte Pipeline-Update abfragen:

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Überwachen der Nutzung der Computeressourcen

cluster_resources-Ereignisse liefern Metriken zur Anzahl der Aufgabenslots im Cluster, zur Auslastung dieser Aufgabenslots und zur Anzahl von Aufgaben, die auf die Planung warten.

Wenn die erweiterte automatische Skalierung aktiviert ist, enthalten cluster_resources-Ereignisse auch Metriken für den Algorithmus für die automatische Skalierung, einschließlich latest_requested_num_executors und optimal_num_executors. Die Ereignisse zeigen auch den Status des Algorithmus als unterschiedliche Zustände an wie z. B. CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS und BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Diese Informationen können in Verbindung mit den Ereignissen zur automatischen Skalierung angezeigt werden, um ein Gesamtbild der erweiterten automatischen Skalierung zu erhalten.

Im folgenden Beispiel wird der Größenverlauf der Vorgangswarteschlange für die letzte Pipelineaktualisierung abfragt:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Im folgenden Beispiel wird der Nutzungsverlauf für die letzte Pipelineaktualisierung abfragt:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Im folgenden Beispiel wird der Verlauf der Executoranzahl abgefragt, begleitet von Metriken, die nur für Pipelines für die erweiterte automatische Skalierung verfügbar sind, einschließlich der Anzahl der vom Algorithmus in der neuesten Anforderung angeforderten Executors, der optimalen Anzahl von Executors, die vom Algorithmus basierend auf den neuesten Metriken empfohlen wird, und dem Zustand des Algorithmus für die automatische Skalierung:

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Überwachen der Delta Live Tables-Pipelines

Sie können Delta Live Tables-Ereignisprotokolldatensätze und andere Azure Databricks-Überwachungsprotokolle verwenden, um ein vollständiges Bild davon zu erhalten, wie Daten in Delta Live Tables aktualisiert werden.

Delta Live Tables verwendet die Anmeldeinformationen des Pipelinebesitzers, um Updates auszuführen. Sie können die verwendeten Anmeldeinformationen ändern, indem Sie den Pipelinebesitzer aktualisieren. Delta Live Tables zeichnet den Benutzer für Aktionen in der Pipeline auf, einschließlich Pipelineerstellung, Konfigurationsänderungen und Auslösen von Updates.

Eine Referenz zu Unity Catalog-Überwachungsereignissen finden Sie unter Unity Catalog-Ereignisse.

Abfragen von Benutzeraktionen im Ereignisprotokoll

Sie können das Ereignisprotokoll verwenden, um Ereignisse wie Benutzeraktionen zu überwachen. Ereignisse, die Informationen zu Benutzeraktionen enthalten, haben den Ereignistyp user_action.

Informationen zur Aktion werden im user_action-Objekt im details-Feld gespeichert. Verwenden Sie die folgende Abfrage, um ein Überwachungsprotokoll von Benutzerereignissen zu erstellen. Informationen zum Erstellen der in dieser Abfrage verwendeten event_log_raw-Ansicht finden Sie unter Abfragen des Ereignisprotokolls.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
1 2021-05-20T19:36:03.517+0000 START user@company.com
2 2021-05-20T19:35:59.913+0000 CREATE user@company.com
3 2021-05-27T00:35:51.971+0000 START user@company.com

Runtimeinformationen

Sie können Laufzeitinformationen für ein Pipelineupdate anzeigen, z. B. die Databricks-Runtime-Version für das Update:

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
1 11,0