Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Il log eventi della pipeline contiene tutte le informazioni correlate a una pipeline, inclusi i log di controllo, i controlli di qualità dei dati, lo stato della pipeline e la derivazione dei dati. È possibile usare il registro eventi per tenere traccia, comprendere e monitorare lo stato delle pipeline di dati.
È possibile visualizzare le voci del registro eventi nell'interfaccia utente di monitoraggio della pipeline, nell'API REST Pipelines o eseguendo direttamente una query sul registro eventi. Questa sezione è incentrata sull'esecuzione di query direttamente sul registro eventi.
È anche possibile definire azioni personalizzate da eseguire quando gli eventi vengono registrati, ad esempio l'invio di avvisi, utilizzando gli hook degli eventi.
Importante
Non eliminare il registro eventi o il catalogo padre o lo schema in cui viene pubblicato il registro eventi. L'eliminazione del registro eventi potrebbe causare un errore di aggiornamento della pipeline durante le esecuzioni future.
Per informazioni dettagliate sullo schema del registro eventi, vedere Schema del registro eventi della pipeline.
Eseguire una query sul registro eventi
Annotazioni
Questa sezione descrive il comportamento e la sintassi predefiniti per l'uso dei registri eventi per le pipeline configurate con Unity Catalog e la modalità di pubblicazione predefinita.
- Per il comportamento delle pipeline di Unity Catalog che utilizzano la modalità di pubblicazione legacy, vedere Usare il registro eventi per le pipeline di tale modalità.
- Per il comportamento e la sintassi delle pipeline del metastore di Hive, vedere Usare il registro eventi per le pipeline del metastore di Hive.
Per impostazione predefinita, una pipeline scrive il registro eventi in una tabella Delta nascosta nel catalogo e nello schema predefiniti configurati per la pipeline. Mentre è nascosta, la tabella può essere comunque interrogata da tutti gli utenti con privilegi sufficientemente elevati. Per impostazione predefinita, solo il proprietario della pipeline può eseguire query sulla tabella del registro eventi.
Per eseguire una query sul registro eventi come proprietario, usare l'ID della pipeline:
SELECT * FROM event_log(<pipelineId>);
Per impostazione predefinita, il nome del registro eventi nascosto viene formattato come event_log_{pipeline_id}, dove l'ID della pipeline è l'UUID assegnato dal sistema con trattini sostituiti da caratteri di sottolineatura.
È possibile pubblicare il registro eventi modificando le impostazioni avanzate per la pipeline. Per informazioni dettagliate, vedere Impostazione della pipeline per il registro eventi. Quando si pubblica un registro eventi, specificare il nome del registro eventi e, facoltativamente, specificare un catalogo e uno schema, come nell'esempio seguente:
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
Il percorso del registro eventi funge anche da percorso dello schema per le query di Auto Loader nella pipeline. Databricks consiglia di creare una vista sulla tabella del log eventi prima di modificare i privilegi, perché alcune impostazioni di calcolo potrebbero consentire agli utenti di accedere ai metadati dello schema se la tabella del log eventi viene condivisa direttamente. La sintassi di esempio seguente crea una vista in una tabella del log eventi e viene usata nelle query del log eventi di esempio incluse in questo articolo. Sostituire <catalog_name>.<schema_name>.<event_log_table_name> con il nome completo della tabella del registro eventi della pipeline. Se il registro eventi è stato pubblicato, usare il nome specificato durante la pubblicazione. In caso contrario, usare event_log(<pipelineId>) dove pipelineId è l'ID della pipeline di cui si vuole eseguire la query.
CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;
Nel Catalogo Unity, le viste supportano le query di streaming. Nell'esempio seguente viene usato Structured Streaming per eseguire query su una vista definita sopra una tabella del log eventi:
df = spark.readStream.table("event_log_raw")
Esempi di query di base
Negli esempi seguenti viene illustrato come eseguire una query sul registro eventi per ottenere informazioni generali sulle pipeline e come eseguire il debug di scenari comuni.
Monitorare gli aggiornamenti della pipeline interrogando aggiornamenti precedenti
L'esempio seguente esegue una query sugli aggiornamenti (o sulle esecuzioni) della pipeline, visualizzando l'ID aggiornamento, lo stato, l'ora di inizio, l'ora di completamento e la durata. In questo modo viene fornita una panoramica delle esecuzioni della pipeline.
Si supponga di aver creato la event_log_raw vista per la pipeline a cui si è interessati, come descritto in Eseguire una query sul registro eventi.
with last_status_per_update AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
timestamp,
ROW_NUMBER() OVER (
PARTITION BY origin.update_id
ORDER BY timestamp DESC
) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
-- Capture the start of the update
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
-- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
COALESCE(
MAX(CASE
WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
THEN timestamp
END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update', 'update_progress')
AND origin.update_id IS NOT NULL
GROUP BY pipeline_id, pipeline_name, pipeline_update_id
HAVING start_time IS NOT NULL
)
SELECT
s.pipeline_id,
s.pipeline_name,
s.pipeline_update_id,
d.start_time,
d.end_time,
CASE
WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
ELSE NULL
END AS duration_seconds,
s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
ON s.pipeline_id = d.pipeline_id
AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;
Eseguire il debug dei problemi di aggiornamento incrementale della visualizzazione materializzata
Questo esempio esegue una query su tutti i flussi dall'aggiornamento più recente di una pipeline. Indica se sono stati aggiornati o meno in modo incrementale, nonché altre informazioni di pianificazione pertinenti utili per il debug del motivo per cui non viene eseguito un aggiornamento incrementale.
Si supponga di aver creato la event_log_raw vista per la pipeline a cui si è interessati, come descritto in Eseguire una query sul registro eventi.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
-- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
SELECT
origin.pipeline_name,
origin.pipeline_id,
origin.flow_name,
lu.latest_update_id,
from_json(
details:planning_information,
'struct<
technique_information: array<struct<
maintenance_type: string,
is_chosen: boolean,
is_applicable: boolean,
cost: double,
incrementalization_issues: array<struct<
issue_type: string,
prevent_incrementalization: boolean,
operator_name: string,
plan_not_incrementalizable_sub_type: string,
expression_name: string,
plan_not_deterministic_sub_type: string
>>
>>
>'
) AS parsed
FROM event_log_raw AS origin
JOIN latest_update lu
ON origin.update_id = lu.latest_update_id
WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
parsed.technique_information AS planning_information
FROM parsed_planning
)
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
chosen_technique.maintenance_type,
chosen_technique,
planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;
Eseguire una query sul costo di un aggiornamento della pipeline
Questo esempio illustra come eseguire una query sull'utilizzo DBU per una pipeline, nonché sull'utente per una determinata esecuzione della pipeline.
SELECT
sku_name,
billing_origin_product,
usage_date,
collect_set(identity_metadata.run_as) as users,
SUM(usage_quantity) AS `DBUs`
FROM
system.billing.usage
WHERE
usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
ALL;
Query avanzate
Negli esempi seguenti viene illustrato come eseguire una query sul registro eventi per gestire scenari meno comuni o più avanzati.
Metriche di query per tutti i flussi in una pipeline
Questo esempio illustra come eseguire query su informazioni dettagliate su ogni flusso in una pipeline. Mostra il nome del flusso, la durata dell'aggiornamento, le metriche sulla qualità dei dati e le informazioni sulle righe elaborate (righe di output, eliminate, inserite o aggiornate e scartate).
Si supponga di aver creato la event_log_raw vista per la pipeline a cui si è interessati, come descritto in Eseguire una query sul registro eventi.
WITH flow_progress_raw AS (
SELECT
origin.pipeline_name AS pipeline_name,
origin.pipeline_id AS pipeline_id,
origin.flow_name AS table_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress.status AS status,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS num_output_rows,
TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT) AS num_upserted_rows,
TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT) AS num_deleted_rows,
TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
FROM_JSON(
details:flow_progress.data_quality.expectations,
SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
) AS expectations_array
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND origin.flow_name IS NOT NULL
AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),
aggregated_flows AS (
SELECT
pipeline_name,
pipeline_id,
update_id,
table_name,
MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
MAX_BY(status, timestamp) FILTER (
WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
) AS final_status,
SUM(COALESCE(num_output_rows, 0)) AS total_output_records,
SUM(COALESCE(num_upserted_rows, 0)) AS total_upserted_records,
SUM(COALESCE(num_deleted_rows, 0)) AS total_deleted_records,
MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
MAX(expectations_array) AS total_expectations
FROM flow_progress_raw
GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
af.pipeline_name,
af.pipeline_id,
af.update_id,
af.table_name,
af.start_timestamp,
af.end_timestamp,
af.final_status,
CASE
WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
ELSE NULL
END AS duration_seconds,
af.total_output_records,
af.total_upserted_records,
af.total_deleted_records,
af.total_expectation_dropped_records,
af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
SELECT update_id
FROM aggregated_flows
ORDER BY end_timestamp DESC
LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;
Eseguire query sui metriche di qualità dei dati o delle aspettative
Se si definiscono le aspettative sui set di dati nella pipeline, le metriche per il numero di record passati e non riusciti vengono archiviate nell'oggetto details:flow_progress.data_quality.expectations . La metrica per il numero di record eliminati viene archiviata nell'oggetto details:flow_progress.data_quality . Gli eventi contenenti informazioni sulla qualità dei dati hanno il tipo di evento flow_progress.
Le metriche sulla qualità dei dati potrebbero non essere disponibili per alcuni set di dati. Vedere le limitazioni delle aspettative.
Sono disponibili le metriche di qualità dei dati seguenti:
| Metrica | Description |
|---|---|
dropped_records |
Numero di record che sono stati eliminati perché non hanno superato una o più aspettative. |
passed_records |
Numero di record che hanno superato i criteri di attesa. |
failed_records |
Numero di record che non hanno superato i criteri di previsione. |
Nell'esempio seguente si interrogano le metriche relative alla qualità dei dati per il più recente aggiornamento della pipeline. Ciò presuppone che tu abbia creato la event_log_raw vista per la pipeline a cui sei interessato, come descritto in Eseguire una query nel registro eventi.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details:flow_progress:data_quality:expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name;
Informazioni sulla derivazione di query
Gli eventi contenenti informazioni sulla derivazione hanno il tipo di evento flow_definition. L'oggetto details:flow_definition contiene i output_dataset e input_datasets che definiscono ogni relazione nel grafico.
Usare la query seguente per estrarre i set di dati di input e output per visualizzare le informazioni sulla derivazione. Ciò presuppone che tu abbia creato la event_log_raw vista per la pipeline a cui sei interessato, come descritto in Eseguire una query nel registro eventi.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
details:flow_definition.output_dataset as flow_name,
details:flow_definition.input_datasets as input_flow_names,
details:flow_definition.flow_type as flow_type,
details:flow_definition.schema, -- the schema of the flow
details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;
Monitorare l'inserimento di file cloud con il caricatore automatico
Le pipeline generano eventi quando il caricatore automatico elabora i file. Per gli eventi del caricatore automatico, il event_type è operation_progress e il details:operation_progress:type è AUTO_LOADER_LISTING o AUTO_LOADER_BACKFILL. L'oggetto details:operation_progress include anche campi status, duration_ms, auto_loader_details:source_pathe auto_loader_details:num_files_listed.
L'esempio seguente esegue una query sugli eventi di Auto Loader per l'aggiornamento più recente. Ciò presuppone che tu abbia creato la event_log_raw vista per la pipeline a cui sei interessato, come descritto in Eseguire una query nel registro eventi.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest_update.id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');
Monitorare il backlog dei dati per ottimizzare la durata dello streaming
Ogni pipeline tiene traccia della quantità di dati presenti nel backlog nell'oggetto details:flow_progress.metrics.backlog_bytes . Gli eventi contenenti metriche backlog hanno il tipo di evento flow_progress. Nell'esempio seguente, vengono interrogate le metriche del backlog per l'aggiornamento più recente della pipeline. Ciò presuppone che tu abbia creato la event_log_raw vista per la pipeline a cui sei interessato, come descritto in Eseguire una query nel registro eventi.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id;
Annotazioni
Le metriche del backlog potrebbero non essere disponibili a seconda del tipo di origine dati della pipeline e della versione di Databricks Runtime.
Monitorare gli eventi di scalabilità automatica per ottimizzare il calcolo classico
Per le pipeline che usano il calcolo classico (in altre parole, non usare il calcolo serverless), il registro eventi acquisisce il ridimensionamento del cluster quando la scalabilità automatica avanzata è abilitata nelle pipeline. Gli eventi contenenti informazioni sulla scalabilità automatica avanzata hanno il tipo di evento autoscale. Le informazioni sulla richiesta di ridimensionamento del cluster vengono archiviate nell'oggetto details:autoscale.
L'esempio seguente esegue una query sulle richieste di ridimensionamento automatico avanzato del cluster per l'ultimo aggiornamento della pipeline. Ciò presuppone che tu abbia creato la event_log_raw vista per la pipeline a cui sei interessato, come descritto in Eseguire una query nel registro eventi.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
Monitorare l'utilizzo delle risorse di calcolo per il calcolo classico
cluster_resources gli eventi forniscono metriche sul numero di slot di attività nel cluster, sull'utilizzo di questi slot di attività e sul numero di attività in attesa di essere schedulate.
Quando la scalabilità automatica avanzata è abilitata, gli eventi cluster_resources contengono anche metriche per l'algoritmo di scalabilità automatica, tra cui latest_requested_num_executorse optimal_num_executors. Gli eventi mostrano anche lo stato dell'algoritmo come stati diversi, ad esempio CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSe BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION.
Queste informazioni possono essere visualizzate insieme agli eventi di scalabilità automatica per fornire un quadro generale della scalabilità automatica migliorata.
L'esempio seguente esegue una query sulla cronologia delle dimensioni della coda delle attività, sulla cronologia di utilizzo, sulla cronologia del conteggio degli executor e su altre metriche e sullo stato per la scalabilità automatica nell'ultimo aggiornamento della pipeline. Ciò presuppone che tu abbia creato la event_log_raw vista per la pipeline a cui sei interessato, come descritto in Eseguire una query nel registro eventi.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
Double(details:cluster_resources.num_executors) as current_executors,
Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id;
Monitorare le metriche di streaming delle pipeline
È possibile visualizzare le metriche sullo stato di avanzamento del flusso in una pipeline. Eseguire una query per stream_progress eventi per ottenere eventi molto simili alle metriche StreamingQueryListener create da Structured Streaming, con le eccezioni seguenti:
- Le metriche seguenti sono presenti in
StreamingQueryListener, ma non instream_progress:numInputRows,inputRowsPerSecondeprocessedRowsPerSecond. - Per i flussi Kafka e Kinesis, i campi
startOffset,endOffsetelatestOffsetpossono essere troppo grandi e vengono troncati. Per ognuno di questi campi, viene aggiunto un campo aggiuntivo...Truncated,startOffsetTruncated,endOffsetTruncatedelatestOffsetTruncated, con un valore booleano per determinare se i dati vengono troncati.
Per eseguire una query per stream_progress gli eventi, è possibile usare una query come la seguente:
SELECT
parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';
Di seguito è riportato un esempio di evento, in JSON:
{
"id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
"sequence": {
"control_plane_seq_no": 1234567890123456
},
"origin": {
"cloud": "<cloud>",
"region": "<region>",
"org_id": 0123456789012345,
"pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"pipeline_type": "WORKSPACE",
"pipeline_name": "<pipeline name>",
"update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
},
"timestamp": "2025-06-17T03:18:14.018Z",
"message": "Completed a streaming update of 'flow_name'."
"level": "INFO",
"details": {
"stream_progress": {
"progress": {
"id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"name": "silverTransformFromBronze",
"timestamp": "2022-11-01T18:21:29.500Z",
"batchId": 4,
"durationMs": {
"latestOffset": 62,
"triggerExecution": 62
},
"stateOperators": [],
"sources": [
{
"description": "DeltaSource[dbfs:/path/to/table]",
"startOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"endOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"latestOffset": null,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
],
"sink": {
"description": "DeltaSink[dbfs:/path/to/sink]",
"numOutputRows": -1
}
}
}
},
"event_type": "stream_progress",
"maturity_level": "EVOLVING"
}
Questo esempio mostra i record non troncati in un'origine Kafka, con i ...Truncated campi impostati su false:
{
"description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffsetTruncated": false,
"startOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706380
}
},
"endOffsetTruncated": false,
"endOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"latestOffsetTruncated": false,
"latestOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"numInputRows": 292,
"inputRowsPerSecond": 13.65826278123392,
"processedRowsPerSecond": 14.479817514628582,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
}
Pipeline di controllo
È possibile usare i record del log eventi e altri log di controllo di Azure Databricks per ottenere un quadro completo del modo in cui i dati vengono aggiornati in una pipeline.
Le pipeline dichiarative di Lakeflow Spark usano le credenziali del proprietario della pipeline per eseguire gli aggiornamenti. È possibile modificare le credenziali usate aggiornando il proprietario della pipeline. Il log di controllo registra l'utente per le azioni nella pipeline, tra cui la creazione della pipeline, le modifiche alla configurazione e l'attivazione degli aggiornamenti.
Per informazioni di riferimento sugli eventi di controllo di Unity Catalog, vedere eventi del catalogo Unity.
Eseguire query sulle azioni utente nel registro eventi
È possibile usare il registro eventi per controllare gli eventi, ad esempio le azioni dell'utente. Gli eventi contenenti informazioni sulle azioni utente hanno il tipo di evento user_action.
Le informazioni sull'azione vengono archiviate nell'oggetto user_action nel campo details. Usare la query seguente per costruire un log di controllo degli eventi utente. Ciò presuppone che tu abbia creato la event_log_raw vista per la pipeline a cui sei interessato, come descritto in Eseguire una query nel registro eventi.
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
|---|---|---|
| 2021-05-20T19:36:03.517+0000 | START |
user@company.com |
| 2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
| 2021-05-27T00:35:51.971+0000 | START |
user@company.com |
informazioni sul runtime di
È possibile visualizzare le informazioni di runtime per un aggiornamento della pipeline, ad esempio la versione di Databricks Runtime per l'aggiornamento. Si presuppone che sia stata creata la event_log_raw vista per la pipeline a cui si è interessati, come descritto in Eseguire una query nel registro eventi.
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
|---|
| 11.0 |