Not
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Händelseloggen för pipelinen innehåller all information som rör en pipeline, inklusive granskningsloggar, datakvalitetskontroller, pipelineförlopp och data härkomst. Du kan använda händelseloggen för att spåra, förstå och övervaka tillståndet för dina datapipelines.
Du kan visa händelseloggposter i användargränssnittet för pipelineövervakning, REST API för pipelines eller genom att fråga händelseloggen direkt. Det här avsnittet fokuserar på att fråga händelseloggen direkt.
Du kan också definiera anpassade åtgärder som ska köras när händelser loggas, till exempel att skicka aviseringar, med händelsekrokar.
Viktigt!
Ta inte bort händelseloggen eller den överordnade katalogen eller schemat där händelseloggen publiceras. Om du tar bort händelseloggen kan det leda till att pipelinen inte uppdateras under framtida körningar.
För fullständig information om händelseloggschemat, se Pipeline event log schema.
Sök i händelseloggen
Anmärkning
I det här avsnittet beskrivs standardbeteendet och syntaxen för att arbeta med händelseloggar för pipelines som konfigurerats med Unity Catalog och standardpubliceringsläget.
- Information om Unity Catalog-pipelines som använder äldre publiceringsläge finns i Så här arbetar du med händelselogg för Unity Catalog-pipelines i äldre publiceringsläge.
- Information om beteendet och syntaxen för Hive-metabutikspipelines finns i Arbeta med händelseloggen för Hive-metabutikspipelines.
Som standard skriver en pipeline händelseloggen till en dold Delta-tabell i standardkatalogen och schemat som konfigurerats för pipelinen. Även om tabellen är dold kan den fortfarande efterfrågas av alla tillräckligt privilegierade användare. Som standard kan endast ägaren av pipelinen köra frågor mot händelseloggtabellen.
Om du vill köra frågor mot händelseloggen som ägare använder du pipeline-ID:t:
SELECT * FROM event_log(<pipelineId>);
Som standard formateras namnet på den dolda händelseloggen som event_log_{pipeline_id}, där pipeline-ID:t är det systemtilldelade UUID:t med bindestreck ersatta med understreck.
Du kan publicera händelseloggen genom att redigera avancerade inställningar för din pipeline. Mer information finns i Pipelineinställning för händelselogg. När du publicerar en händelselogg anger du namnet på händelseloggen och kan också ange en katalog och ett schema, som i följande exempel:
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
Platsen för händelseloggen fungerar också som schemaplats för alla frågor för automatisk inläsning i pipelinen. Databricks rekommenderar att du skapar en vy över händelseloggtabellen innan du ändrar behörigheterna, eftersom vissa beräkningsinställningar kan ge användarna åtkomst till schemametadata om händelseloggtabellen delas direkt. Följande exempelsyntax skapar en vy i en händelseloggtabell och används i exempelhändelseloggfrågorna som ingår i den här artikeln. Ersätt <catalog_name>.<schema_name>.<event_log_table_name> med det fullständigt kvalificerade tabellnamnet för din pipelinehändelselogg. Om du har publicerat händelseloggen använder du det namn som angavs vid publiceringen. Annars använder du event_log(<pipelineId>) där pipelineId är ID för den pipeline som du vill köra frågor mot.
CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;
Inom Unity Catalog stöder vyer strömmande frågor. I följande exempel används Structured Streaming för att fråga en vy som definierats ovanpå en händelseloggtabell:
df = spark.readStream.table("event_log_raw")
Grundläggande frågeexempel
I följande exempel visas hur du frågar efter händelseloggen för att få allmän information om pipelines och för att felsöka vanliga scenarier.
Övervaka pipelineuppdateringar genom att fråga efter tidigare uppdateringar
I följande exempel visas uppdateringarna (eller körningarna) för din pipeline, vilket visar uppdaterings-ID, status, starttid, slutförandetid och varaktighet. Detta ger dig en översikt över körningar för pipelinen.
Förutsätter att du har skapat event_log_raw vyn för pipelinen som du är intresserad av, som beskrivs i Fråga händelseloggen.
with last_status_per_update AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
timestamp,
ROW_NUMBER() OVER (
PARTITION BY origin.update_id
ORDER BY timestamp DESC
) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
-- Capture the start of the update
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
-- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
COALESCE(
MAX(CASE
WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
THEN timestamp
END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update', 'update_progress')
AND origin.update_id IS NOT NULL
GROUP BY pipeline_id, pipeline_name, pipeline_update_id
HAVING start_time IS NOT NULL
)
SELECT
s.pipeline_id,
s.pipeline_name,
s.pipeline_update_id,
d.start_time,
d.end_time,
CASE
WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
ELSE NULL
END AS duration_seconds,
s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
ON s.pipeline_id = d.pipeline_id
AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;
Felsöka problem med inkrementell uppdatering av materialiserad vy
Det här exemplet frågar alla flöden från den senaste uppdateringen av en pipeline. Den visar om de har uppdaterats stegvis eller inte, samt annan relevant planeringsinformation som är användbar för felsökning av varför en inkrementell uppdatering inte sker.
Förutsätter att du har skapat event_log_raw vyn för pipelinen som du är intresserad av, som beskrivs i Fråga händelseloggen.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
-- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
SELECT
origin.pipeline_name,
origin.pipeline_id,
origin.flow_name,
lu.latest_update_id,
from_json(
details:planning_information,
'struct<
technique_information: array<struct<
maintenance_type: string,
is_chosen: boolean,
is_applicable: boolean,
cost: double,
incrementalization_issues: array<struct<
issue_type: string,
prevent_incrementalization: boolean,
operator_name: string,
plan_not_incrementalizable_sub_type: string,
expression_name: string,
plan_not_deterministic_sub_type: string
>>
>>
>'
) AS parsed
FROM event_log_raw AS origin
JOIN latest_update lu
ON origin.update_id = lu.latest_update_id
WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
parsed.technique_information AS planning_information
FROM parsed_planning
)
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
chosen_technique.maintenance_type,
chosen_technique,
planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;
Fråga efter kostnaden för en pipelineuppdatering
Det här exemplet visar hur du frågar DBU-användningen för en pipeline samt användaren för en viss pipelinekörning.
SELECT
sku_name,
billing_origin_product,
usage_date,
collect_set(identity_metadata.run_as) as users,
SUM(usage_quantity) AS `DBUs`
FROM
system.billing.usage
WHERE
usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
ALL;
Avancerade frågor
I följande exempel visas hur du kör frågor mot händelseloggen för att hantera mindre vanliga eller mer avancerade scenarier.
Fråga efter mått för alla flöden i en pipeline
Det här exemplet visar hur du frågar efter detaljerad information om varje flöde i en pipeline. Den visar flödesnamn, uppdateringstid, datakvalitetsmått och information om de bearbetade raderna (utdatarader, borttagna, uppsertade och borttagna poster).
Förutsätter att du har skapat event_log_raw vyn för pipelinen som du är intresserad av, som beskrivs i Fråga händelseloggen.
WITH flow_progress_raw AS (
SELECT
origin.pipeline_name AS pipeline_name,
origin.pipeline_id AS pipeline_id,
origin.flow_name AS table_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress.status AS status,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS num_output_rows,
TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT) AS num_upserted_rows,
TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT) AS num_deleted_rows,
TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
FROM_JSON(
details:flow_progress.data_quality.expectations,
SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
) AS expectations_array
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND origin.flow_name IS NOT NULL
AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),
aggregated_flows AS (
SELECT
pipeline_name,
pipeline_id,
update_id,
table_name,
MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
MAX_BY(status, timestamp) FILTER (
WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
) AS final_status,
SUM(COALESCE(num_output_rows, 0)) AS total_output_records,
SUM(COALESCE(num_upserted_rows, 0)) AS total_upserted_records,
SUM(COALESCE(num_deleted_rows, 0)) AS total_deleted_records,
MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
MAX(expectations_array) AS total_expectations
FROM flow_progress_raw
GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
af.pipeline_name,
af.pipeline_id,
af.update_id,
af.table_name,
af.start_timestamp,
af.end_timestamp,
af.final_status,
CASE
WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
ELSE NULL
END AS duration_seconds,
af.total_output_records,
af.total_upserted_records,
af.total_deleted_records,
af.total_expectation_dropped_records,
af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
SELECT update_id
FROM aggregated_flows
ORDER BY end_timestamp DESC
LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;
Fråga datakvalitet eller förväntningsmått
Om du definierar förväntningar på datauppsättningar i pipelinen, lagras måtten för antalet poster som uppfyllde och inte uppfyllde en förväntan i objektet details:flow_progress.data_quality.expectations. Måttet för antalet borttagna poster lagras i objektet details:flow_progress.data_quality . Händelser som innehåller information om datakvalitet har händelsetypen flow_progress.
Datakvalitetsmått kanske inte är tillgängliga för vissa datauppsättningar. Se förväntningsbegränsningarna.
Följande datakvalitetsmått är tillgängliga:
| Mätvärde | Description |
|---|---|
dropped_records |
Antalet poster som togs bort eftersom de inte uppfyllde ett eller flera krav. |
passed_records |
Antalet poster som passerade förväntanskriterierna. |
failed_records |
Antalet poster som misslyckades med de förväntade kriterierna. |
I följande exempel efterfrågas datakvalitetsmåtten för den senaste pipelineuppdateringen. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details:flow_progress:data_quality:expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name;
Fråga efter ursprungsinformation
Händelser som innehåller information om ursprung har händelsetypen flow_definition. Objektet details:flow_definition innehåller output_dataset och input_datasets som definierar varje relation i diagrammet.
Använd följande fråga för att extrahera datauppsättningarna för indata och utdata för att se ursprungsinformation. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
details:flow_definition.output_dataset as flow_name,
details:flow_definition.input_datasets as input_flow_names,
details:flow_definition.flow_type as flow_type,
details:flow_definition.schema, -- the schema of the flow
details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;
Övervaka molnfilinmatning med Auto Loader-funktionen
Pipelines genererar händelser när autoinläsaren bearbetar filer. För händelser med automatisk inläsning är event_typeoperation_progress och details:operation_progress:type är antingen AUTO_LOADER_LISTING eller AUTO_LOADER_BACKFILL.
details:operation_progress-objektet innehåller även fälten status, duration_ms, auto_loader_details:source_pathoch auto_loader_details:num_files_listed.
I det följande exemplet frågas efter Auto Loader-händelser för den senaste uppdateringen. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest_update.id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');
Övervaka kvarvarande data för att optimera strömningstiden
Varje pipeline spårar hur mycket data som finns i kvarvarande uppgifter i details:flow_progress.metrics.backlog_bytes objektet. Händelser som innehåller mått för kvarvarande uppgifter har händelsetypen flow_progress. I följande exempel efterfrågas kvarvarande mått för den senaste pipelineuppdateringen. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id;
Anmärkning
Måtten för kvarvarande uppgifter kanske inte är tillgängliga beroende på pipelinens datakällatyp och Databricks Runtime-version.
Övervaka autoskalningshändelser för att optimera klassisk beräkning
För pipelines som använder klassisk beräkning (det vill säga, inte serverlös beräkning) registrerar händelseloggen klusterstorleksförändringar när utökad autoskalning är aktiverad i dina pipelines. Händelser som innehåller information om förbättrad autoskalning har händelsetypen autoscale. Informationen om en klusterstorleksändringsbegäran lagras i objektet details:autoscale.
I följande exempel frågas om förfrågningar om storleksändringar för det förbättrade autoskalningsklustret vid den senaste pipelineuppdateringen. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
Övervaka beräkningsresursanvändning för klassisk beräkning
cluster_resources händelser ger mått på antalet aktivitetsfack i klustret, hur mycket dessa aktivitetsfack används och hur många aktiviteter som väntar på att schemaläggas.
När förbättrad autoskalning är aktiverad innehåller cluster_resources händelser även mått för algoritmen för automatisk skalning, inklusive latest_requested_num_executorsoch optimal_num_executors. Händelserna visar också status för algoritmen som olika tillstånd, till exempel CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSoch BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION.
Dessa uppgifter kan visas tillsammans med autoskalningshändelserna för att ge en helhetsbild av förbättrad autoskalning.
I följande exempel efterfrågas historiken för aktivitetsköstorlek, användningshistorik, historik för antal exekutorer och andra mått och tillstånd för automatisk skalning i den senaste pipelineuppdateringen. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
Double(details:cluster_resources.num_executors) as current_executors,
Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id;
Övervaka metriker för pipeline-strömning
Du kan visa mått om strömmens förlopp i en pipeline. Gör en förfrågan om stream_progress händelser för att få händelser som är mycket lik måtten i StreamingQueryListener som skapats av Structured Streaming, med följande undantag:
- Följande mått finns i
StreamingQueryListener, men inte istream_progress:numInputRows,inputRowsPerSecondochprocessedRowsPerSecond. - För Kafka- och Kineses-strömmar kan fälten
startOffset,endOffsetochlatestOffsetvara för stora och trunkerade. För varje av dessa fält läggs ett ytterligare fält,...Truncated,startOffsetTruncated,endOffsetTruncatedochlatestOffsetTruncated, till med ett booleskt värde för om data är trunkerad.
Om du vill fråga efter stream_progress händelser kan du använda en fråga, till exempel följande:
SELECT
parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';
Här är ett exempel på en händelse i JSON:
{
"id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
"sequence": {
"control_plane_seq_no": 1234567890123456
},
"origin": {
"cloud": "<cloud>",
"region": "<region>",
"org_id": 0123456789012345,
"pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"pipeline_type": "WORKSPACE",
"pipeline_name": "<pipeline name>",
"update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
},
"timestamp": "2025-06-17T03:18:14.018Z",
"message": "Completed a streaming update of 'flow_name'."
"level": "INFO",
"details": {
"stream_progress": {
"progress": {
"id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"name": "silverTransformFromBronze",
"timestamp": "2022-11-01T18:21:29.500Z",
"batchId": 4,
"durationMs": {
"latestOffset": 62,
"triggerExecution": 62
},
"stateOperators": [],
"sources": [
{
"description": "DeltaSource[dbfs:/path/to/table]",
"startOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"endOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"latestOffset": null,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
],
"sink": {
"description": "DeltaSink[dbfs:/path/to/sink]",
"numOutputRows": -1
}
}
}
},
"event_type": "stream_progress",
"maturity_level": "EVOLVING"
}
Det här exemplet visar otrunkerade poster i en Kafka-källa, med fälten ...Truncated inställda på false:
{
"description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffsetTruncated": false,
"startOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706380
}
},
"endOffsetTruncated": false,
"endOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"latestOffsetTruncated": false,
"latestOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"numInputRows": 292,
"inputRowsPerSecond": 13.65826278123392,
"processedRowsPerSecond": 14.479817514628582,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
}
Auditera pipelines
Du kan använda händelseloggposter och andra Azure Databricks-granskningsloggar för att få en fullständig bild av hur data uppdateras i en pipeline.
Lakeflow Spark Deklarativa Pipelinjer använder autentiseringsuppgifterna för pipelineägaren för att köra uppdateringar. Du kan ändra de autentiseringsuppgifter som används genom att uppdatera pipelineägaren. Granskningsloggen registrerar vilka åtgärder som utförts av användaren på pipelinen, inklusive skapande av en pipeline, ändringar i konfigurationen och uppdateringar som initieras.
Mer information om granskningshändelser i Unity Catalog finns i Unity Catalog-händelser.
Fråga efter användaråtgärder i händelseloggen
Du kan använda händelseloggen för att granska händelser, till exempel användaråtgärder. Händelser som innehåller information om användaråtgärder har händelsetypen user_action.
Information om åtgärden lagras i user_action-objektet i fältet details. Använd följande fråga för att skapa en granskningslogg med användarhändelser. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
|---|---|---|
| 2021-05-20T19:36:03.517+0000 | START |
user@company.com |
| 2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
| 2021-05-27T00:35:51.971+0000 | START |
user@company.com |
Körinformation
Du kan visa körningsinformation för en pipelineuppdatering, till exempel Databricks Runtime-versionen för uppdateringen. Detta förutsätter att du har skapat event_log_raw vyn för pipelinen som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
|---|
| 11.0 |