Anmerkung
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen, dich anzumelden oder die Verzeichnisse zu wechseln.
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen , die Verzeichnisse zu wechseln.
Das Pipelineereignisprotokoll enthält alle Informationen zu einer Pipeline, einschließlich Überwachungsprotokollen, Datenqualitätsprüfungen, Pipelinefortschritt und Datenlinie. Sie können das Ereignisprotokoll verwenden, um den Status Ihrer Datenpipelines nachzuverfolgen, zu verstehen und zu überwachen.
Sie können Ereignisprotokolleinträge in der Pipelineüberwachungs-Benutzeroberfläche, der Pipelines-REST-API oder durch direkte Abfrage des Ereignisprotokolls anzeigen. Dieser Abschnitt konzentriert sich auf das direkte Abfragen des Ereignisprotokolls.
Sie können auch benutzerdefinierte Aktionen definieren, die ausgeführt werden sollen, wenn Ereignisse mit Ereignishooks protokolliert werden, z. B. das Senden von Warnungen.
Von Bedeutung
Löschen Sie nicht das Ereignisprotokoll, den übergeordneten Katalog oder das Schema, in dem es veröffentlicht wird. Das Löschen des Ereignisprotokolls kann dazu führen, dass die Pipeline während zukünftiger Ausführung nicht aktualisiert werden kann.
Ausführliche Informationen zum Ereignisprotokollschema finden Sie unter Pipeline-Ereignisprotokollschema.
Abfragen des Ereignisprotokolls
Hinweis
In diesem Abschnitt werden das Standardverhalten und die Syntax für das Arbeiten mit Ereignisprotokollen für Pipelines beschrieben, die mit Unity Catalog und dem Standardveröffentlichungsmodus konfiguriert sind.
- Verhalten für Unity Catalog-Pipelines, die den Legacy-Veröffentlichungsmodus verwenden, finden Sie unter Arbeiten mit dem Ereignisprotokoll für die Legacy-Publishing-Modus-Pipelines in Unity Catalog.
- Verhalten und Syntax für Hive-Metastore-Pipelines finden Sie unter "Arbeiten mit dem Ereignisprotokoll für Hive-Metastore-Pipelines".
Standardmäßig schreibt eine Pipeline das Ereignisprotokoll in eine ausgeblendete Delta-Tabelle im Standardkatalog und dem Standardschema, die für die Pipeline konfiguriert sind. Die Tabelle kann zwar ausgeblendet werden, aber dennoch von allen ausreichend privilegierten Benutzern abgefragt werden. Standardmäßig kann nur der Besitzer der Pipeline die Ereignisprotokolltabelle abfragen.
Verwenden Sie die Pipeline-ID, um das Ereignisprotokoll als Besitzer abzufragen:
SELECT * FROM event_log(<pipelineId>);
Standardmäßig ist der Name für das ausgeblendete Ereignisprotokoll so event_log_{pipeline_id}formatiert, dass die Pipeline-ID die vom System zugewiesene UUID mit Bindestrichen ist, die durch Unterstriche ersetzt werden.
Sie können das Ereignisprotokoll veröffentlichen, indem Sie die erweiterten Einstellungen für Ihre Pipeline bearbeiten. Ausführliche Informationen finden Sie unter Pipelineeinstellung für das Ereignisprotokoll. Wenn Sie ein Ereignisprotokoll veröffentlichen, geben Sie den Namen für das Ereignisprotokoll an, und geben Sie optional einen Katalog und ein Schema an, wie im folgenden Beispiel gezeigt:
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
Der Ereignisprotokollspeicherort dient auch als Schemaspeicherort für alle Auto Loader-Abfragen in der Pipeline. Databricks empfiehlt das Erstellen einer Ansicht über die Ereignisprotokolltabelle vor dem Ändern der Berechtigungen, da einige Berechnungseinstellungen Benutzern möglicherweise den Zugriff auf Schemametadaten ermöglichen können, wenn die Ereignisprotokolltabelle direkt freigegeben wird. Die folgende Beispielsyntax erstellt eine Ansicht in einer Ereignisprotokolltabelle und wird in den beispielen Ereignisprotokollabfragen verwendet, die in diesem Artikel enthalten sind. Ersetzen Sie <catalog_name>.<schema_name>.<event_log_table_name> durch den vollständig qualifizierten Tabellennamen Ihres Pipeline-Ereignisprotokolls. Wenn Sie das Ereignisprotokoll veröffentlicht haben, verwenden Sie den bei der Veröffentlichung angegebenen Namen. Verwenden Sie andernfalls event_log(<pipelineId>), wobei die pipelineId die ID der Pipeline ist, die Sie abfragen möchten.
CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;
Im Unity-Katalog unterstützen Ansichten Streamingabfragen. Im folgenden Beispiel wird strukturiertes Streaming verwendet, um eine Ansicht abzufragen, die über einer Ereignisprotokolltabelle definiert ist:
df = spark.readStream.table("event_log_raw")
Grundlegende Abfragebeispiele
Die folgenden Beispiele zeigen, wie Sie das Ereignisprotokoll abfragen, um allgemeine Informationen zu Pipelines abzurufen und allgemeine Szenarien zu debuggen.
Pipeline-Updates überwachen, indem frühere Updates abgefragt werden
Im folgenden Beispiel werden die Updates (oder Ausführung) Ihrer Pipeline mit der Update-ID, dem Status, der Startzeit, der Abschlusszeit und der Dauer abgerufen. Dadurch erhalten Sie einen Überblick über die Durchläufe der Pipeline.
Es wird davon ausgegangen, dass Sie die event_log_raw Ansicht für die pipeline erstellt haben, an der Sie interessiert sind, wie im Ereignisprotokoll "Abfragen" beschrieben.
with last_status_per_update AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
timestamp,
ROW_NUMBER() OVER (
PARTITION BY origin.update_id
ORDER BY timestamp DESC
) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
-- Capture the start of the update
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
-- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
COALESCE(
MAX(CASE
WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
THEN timestamp
END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update', 'update_progress')
AND origin.update_id IS NOT NULL
GROUP BY pipeline_id, pipeline_name, pipeline_update_id
HAVING start_time IS NOT NULL
)
SELECT
s.pipeline_id,
s.pipeline_name,
s.pipeline_update_id,
d.start_time,
d.end_time,
CASE
WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
ELSE NULL
END AS duration_seconds,
s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
ON s.pipeline_id = d.pipeline_id
AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;
Debuggen von Problemen bei der inkrementellen Aktualisierung materialisierter Sichten
In diesem Beispiel werden alle Flüsse aus der letzten Aktualisierung einer Pipeline abgerufen. Es wird angezeigt, ob sie inkrementell aktualisiert wurden oder nicht, sowie andere relevante Planungsinformationen, die für das Debuggen hilfreich sind, warum eine inkrementelle Aktualisierung nicht geschieht.
Es wird davon ausgegangen, dass Sie die event_log_raw Ansicht für die pipeline erstellt haben, an der Sie interessiert sind, wie im Ereignisprotokoll "Abfragen" beschrieben.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
-- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
SELECT
origin.pipeline_name,
origin.pipeline_id,
origin.flow_name,
lu.latest_update_id,
from_json(
details:planning_information,
'struct<
technique_information: array<struct<
maintenance_type: string,
is_chosen: boolean,
is_applicable: boolean,
cost: double,
incrementalization_issues: array<struct<
issue_type: string,
prevent_incrementalization: boolean,
operator_name: string,
plan_not_incrementalizable_sub_type: string,
expression_name: string,
plan_not_deterministic_sub_type: string
>>
>>
>'
) AS parsed
FROM event_log_raw AS origin
JOIN latest_update lu
ON origin.update_id = lu.latest_update_id
WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
parsed.technique_information AS planning_information
FROM parsed_planning
)
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
chosen_technique.maintenance_type,
chosen_technique,
planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;
Abfragen der Kosten für ein Pipeline-Update
In diesem Beispiel wird gezeigt, wie die DBU-Verwendung für eine Pipeline und der Benutzer für eine bestimmte Pipelineausführung abgefragt werden.
SELECT
sku_name,
billing_origin_product,
usage_date,
collect_set(identity_metadata.run_as) as users,
SUM(usage_quantity) AS `DBUs`
FROM
system.billing.usage
WHERE
usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
ALL;
Erweiterte Abfragen
Die folgenden Beispiele zeigen, wie Sie das Ereignisprotokoll abfragen, um weniger häufige oder komplexere Szenarien zu behandeln.
Abfragemetriken für alle Flüsse in einer Pipeline
In diesem Beispiel wird gezeigt, wie Sie detaillierte Informationen zu jedem Fluss in einer Pipeline abfragen. Es zeigt den Flow-Name, die Aktualisierungsdauer, Datenqualitätsmetriken und Informationen zu den verarbeiteten Zeilen an (Ausgabezeilen, gelöschte, hochgeladen und verworfene Datensätze).
Es wird davon ausgegangen, dass Sie die event_log_raw Ansicht für die pipeline erstellt haben, an der Sie interessiert sind, wie im Ereignisprotokoll "Abfragen" beschrieben.
WITH flow_progress_raw AS (
SELECT
origin.pipeline_name AS pipeline_name,
origin.pipeline_id AS pipeline_id,
origin.flow_name AS table_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress.status AS status,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS num_output_rows,
TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT) AS num_upserted_rows,
TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT) AS num_deleted_rows,
TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
FROM_JSON(
details:flow_progress.data_quality.expectations,
SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
) AS expectations_array
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND origin.flow_name IS NOT NULL
AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),
aggregated_flows AS (
SELECT
pipeline_name,
pipeline_id,
update_id,
table_name,
MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
MAX_BY(status, timestamp) FILTER (
WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
) AS final_status,
SUM(COALESCE(num_output_rows, 0)) AS total_output_records,
SUM(COALESCE(num_upserted_rows, 0)) AS total_upserted_records,
SUM(COALESCE(num_deleted_rows, 0)) AS total_deleted_records,
MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
MAX(expectations_array) AS total_expectations
FROM flow_progress_raw
GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
af.pipeline_name,
af.pipeline_id,
af.update_id,
af.table_name,
af.start_timestamp,
af.end_timestamp,
af.final_status,
CASE
WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
ELSE NULL
END AS duration_seconds,
af.total_output_records,
af.total_upserted_records,
af.total_deleted_records,
af.total_expectation_dropped_records,
af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
SELECT update_id
FROM aggregated_flows
ORDER BY end_timestamp DESC
LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;
Abfrage von Datenqualitäts- oder Erwartungsmetriken
Wenn Sie die Erwartungen an Datensätze in Ihrer Pipeline definieren, werden die Metriken für die Anzahl der übergebenen Datensätze und fehlgeschlagenen Erwartungen im details:flow_progress.data_quality.expectations Objekt gespeichert. Die Metrik für die Anzahl der verworfenen Datensätze wird im details:flow_progress.data_quality Objekt gespeichert. Ereignisse, die Informationen zur Datenqualität enthalten, weisen den Ereignistyp flow_progress auf.
Datenqualitätsmetriken sind für einige Datensätze möglicherweise nicht verfügbar. Sehen Sie sich die Erwartungenseinschränkungen an.
Die folgenden Datenqualitätsmetriken sind verfügbar:
| Metric | Description |
|---|---|
dropped_records |
Die Anzahl der Einträge, die abgelehnt wurden, weil sie eine oder mehrere Erwartungen nicht erfüllt haben. |
passed_records |
Die Anzahl der Datensätze, die die Erwartungskriterien bestanden haben. |
failed_records |
Die Anzahl der Datensätze, bei denen die Erwartungskriterien nicht erfüllt wurden. |
Im folgenden Beispiel werden die Datenqualitätsmetriken für die letzte Pipeline-Aktualisierung abgefragt. Dabei wird davon ausgegangen, dass Sie die event_log_raw Ansicht für die pipeline erstellt haben, an der Sie interessiert sind, wie im Ereignisprotokoll "Abfragen" beschrieben.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details:flow_progress:data_quality:expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name;
Abfragen von Herkunftsinformationen
Ereignisse, die Informationen zur Datenherkunft enthalten, weisen den Ereignistyp flow_definition auf. Das details:flow_definition-Objekt enthält das output_dataset und das input_datasets, welche jede Beziehung im Diagramm definieren.
Verwenden Sie die folgende Abfrage, um die Eingabe- und Ausgabedatensätze zu extrahieren, um Linieninformationen anzuzeigen. Dabei wird davon ausgegangen, dass Sie die event_log_raw Ansicht für die pipeline erstellt haben, an der Sie interessiert sind, wie im Ereignisprotokoll "Abfragen" beschrieben.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
details:flow_definition.output_dataset as flow_name,
details:flow_definition.input_datasets as input_flow_names,
details:flow_definition.flow_type as flow_type,
details:flow_definition.schema, -- the schema of the flow
details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;
Überwachen des Clouddatei-Ingestionsprozesses mit Auto Loader
Pipelines generieren Ereignisse, wenn das automatische Laden Dateien verarbeitet. Bei Auto Loader-Ereignissen ist der event_typeoperation_progress und der details:operation_progress:type ist entweder AUTO_LOADER_LISTING oder AUTO_LOADER_BACKFILL. Das details:operation_progress Objekt enthält auch status, duration_ms, auto_loader_details:source_path und auto_loader_details:num_files_listed Felder.
Im folgenden Beispiel werden Auto Loader-Ereignisse für das letzte Update abgefragt. Dabei wird davon ausgegangen, dass Sie die event_log_raw Ansicht für die pipeline erstellt haben, an der Sie interessiert sind, wie im Ereignisprotokoll "Abfragen" beschrieben.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest_update.id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');
Überwachen des Datenbacklogs zur Optimierung der Streamingdauer
Jede Pipeline verfolgt, wie viele Daten im Backlog im details:flow_progress.metrics.backlog_bytes Objekt vorhanden sind. Ereignisse, die Backlogmetriken enthalten, haben den Ereignistyp flow_progress. Im folgenden Beispiel werden Backlogmetriken für die letzte Pipelineaktualisierung abgefragt. Dabei wird davon ausgegangen, dass Sie die event_log_raw Ansicht für die pipeline erstellt haben, an der Sie interessiert sind, wie im Ereignisprotokoll "Abfragen" beschrieben.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id;
Hinweis
Die Backlogmetriken sind je nach Datenquellentyp und Databricks-Runtime-Version der Pipeline möglicherweise nicht verfügbar.
Überwachen von automatischen Skalierungsereignissen zur Optimierung der klassischen Berechnung
Bei Pipelines, die klassische Compute verwenden (d. h. keine serverlose Berechnung verwenden), erfasst das Ereignisprotokoll Cluster-Größenänderungen, wenn die erweiterte automatische Skalierung in Ihren Pipelines aktiviert ist. Ereignisse, die Informationen zur Datenqualität enthalten, weisen den Ereignistyp autoscale auf. Die Größe der Anforderungsinformationen des Clusters wird im details:autoscale-Objekt gespeichert.
Im folgenden Beispiel werden die Größenänderungsanforderungen für erweiterte Cluster mit automatischer Skalierung für das letzte Pipelineupdate abgefragt. Dabei wird davon ausgegangen, dass Sie die event_log_raw Ansicht für die pipeline erstellt haben, an der Sie interessiert sind, wie im Ereignisprotokoll "Abfragen" beschrieben.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
Überwachung der Ressourcenauslastung für klassisches Compute
cluster_resources Ereignisse stellen Metriken für die Anzahl der Aufgabenslots im Cluster bereit, wie viel diese Aufgabenslots genutzt werden, und wie viele Aufgaben darauf warten, geplant zu werden.
Wenn die erweiterte automatische Skalierung aktiviert ist, enthalten cluster_resources-Ereignisse auch Metriken für den Algorithmus für die automatische Skalierung, einschließlich latest_requested_num_executors und optimal_num_executors. Die Ereignisse zeigen auch den Status des Algorithmus als unterschiedliche Zustände an wie z. B. CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS und BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION.
Diese Informationen können in Verbindung mit den Ereignissen zur automatischen Skalierung angezeigt werden, um ein Gesamtbild der erweiterten automatischen Skalierung zu erhalten.
Im folgenden Beispiel werden der Verlauf der Größe der Aufgabenwarteschlange, der Auslastungsverlauf, der Verlauf der Executor-Anzahl und weitere Kennzahlen und Zustände der automatischen Skalierung beim letzten Update der Pipeline abgefragt. Dabei wird davon ausgegangen, dass Sie die event_log_raw Ansicht für die pipeline erstellt haben, an der Sie interessiert sind, wie im Ereignisprotokoll "Abfragen" beschrieben.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
Double(details:cluster_resources.num_executors) as current_executors,
Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id;
Überwachung von Pipeline-Streaming-Metriken
Sie können Metriken zum Fortschreiten des Streams in einer Pipeline anzeigen. Fragen Sie nach stream_progress Ereignissen ab, um Ereignisse abzurufen, die den streamingQueryListener-Metriken sehr ähnlich sind, die von Structured Streaming erstellt wurden, mit den folgenden Ausnahmen:
- Die folgenden Metriken sind in
StreamingQueryListener, aber nicht instream_progress:numInputRows,inputRowsPerSecondundprocessedRowsPerSecond. - Für Kafka- und Kinesis-Datenströme können die Felder
startOffset,endOffsetundlatestOffsetzu groß sein und werden daher abgeschnitten. Für jedes dieser Felder wird ein zusätzliches...TruncatedFeld,startOffsetTruncated,endOffsetTruncated, undlatestOffsetTruncated, mit einem booleschen Wert hinzugefügt, um anzugeben, ob die Daten abgeschnitten werden.
Zum Abfragen von stream_progress Ereignissen können Sie eine Abfrage wie die folgenden verwenden:
SELECT
parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';
Hier ist ein Beispiel für ein Ereignis in JSON:
{
"id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
"sequence": {
"control_plane_seq_no": 1234567890123456
},
"origin": {
"cloud": "<cloud>",
"region": "<region>",
"org_id": 0123456789012345,
"pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"pipeline_type": "WORKSPACE",
"pipeline_name": "<pipeline name>",
"update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
},
"timestamp": "2025-06-17T03:18:14.018Z",
"message": "Completed a streaming update of 'flow_name'."
"level": "INFO",
"details": {
"stream_progress": {
"progress": {
"id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"name": "silverTransformFromBronze",
"timestamp": "2022-11-01T18:21:29.500Z",
"batchId": 4,
"durationMs": {
"latestOffset": 62,
"triggerExecution": 62
},
"stateOperators": [],
"sources": [
{
"description": "DeltaSource[dbfs:/path/to/table]",
"startOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"endOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"latestOffset": null,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
],
"sink": {
"description": "DeltaSink[dbfs:/path/to/sink]",
"numOutputRows": -1
}
}
}
},
"event_type": "stream_progress",
"maturity_level": "EVOLVING"
}
Dieses Beispiel zeigt nicht gekürzte Datensätze in einer Kafka-Quelle, wobei die ...Truncated Felder auf false:
{
"description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffsetTruncated": false,
"startOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706380
}
},
"endOffsetTruncated": false,
"endOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"latestOffsetTruncated": false,
"latestOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"numInputRows": 292,
"inputRowsPerSecond": 13.65826278123392,
"processedRowsPerSecond": 14.479817514628582,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
}
Überwachen von Pipelines
Sie können Ereignisprotokolleinträge und andere Azure Databricks-Überwachungsprotokolle verwenden, um ein vollständiges Bild davon zu erhalten, wie Daten in einer Pipeline aktualisiert werden.
Lakeflow Spark Declarative Pipelines verwendet die Anmeldeinformationen des Pipelinebesitzers, um Updates auszuführen. Sie können die verwendeten Anmeldeinformationen ändern, indem Sie den Pipelinebesitzer aktualisieren. Das Überwachungsprotokoll zeichnet den Benutzer für Aktionen in der Pipeline auf, einschließlich Pipelineerstellung, Bearbeitungen zur Konfiguration und Auslösen von Updates.
Eine Referenz zu Unity Catalog-Überwachungsereignissen finden Sie unter Unity Catalog-Ereignisse.
Abfragen von Benutzeraktionen im Ereignisprotokoll
Sie können das Ereignisprotokoll verwenden, um Ereignisse wie Benutzeraktionen zu überwachen. Ereignisse, die Informationen zu Benutzeraktionen enthalten, haben den Ereignistyp user_action.
Informationen zur Aktion werden im user_action-Objekt im details-Feld gespeichert. Verwenden Sie die folgende Abfrage, um ein Überwachungsprotokoll von Benutzerereignissen zu erstellen. Dabei wird davon ausgegangen, dass Sie die event_log_raw Ansicht für die pipeline erstellt haben, an der Sie interessiert sind, wie im Ereignisprotokoll "Abfragen" beschrieben.
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
|---|---|---|
| 2021-05-20T19:36:03.517+0000 | START |
user@company.com |
| 2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
| 2021-05-27T00:35:51.971+0000 | START |
user@company.com |
Runtimeinformationen
Sie können Laufzeitinformationen für ein Pipelineupdate anzeigen, z. B. die Databricks-Runtime-Version für das Update. Dabei wird davon ausgegangen, dass Sie die Ansicht für die event_log_raw pipeline erstellt haben, an der Sie interessiert sind, wie im Ereignisprotokoll "Abfrage" beschrieben.
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
|---|
| 11.0 |