Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Log peristiwa alur berisi semua informasi yang terkait dengan alur, termasuk log audit, pemeriksaan kualitas data, kemajuan alur, dan silsilah data. Anda dapat menggunakan log peristiwa untuk melacak, memahami, dan memantau status alur data Anda.
Anda dapat melihat entri log peristiwa di antarmuka pengguna pemantauan alur, Pipelines REST API, atau dengan langsung mengkueri log peristiwa. Bagian ini berfokus pada kueri log peristiwa secara langsung.
Anda juga dapat menentukan tindakan kustom untuk dijalankan saat peristiwa dicatat, misalnya, mengirim pemberitahuan, dengan kait peristiwa.
Penting
Jangan hapus log peristiwa atau katalog induk atau skema tempat log peristiwa diterbitkan. Menghapus log peristiwa dapat mengakibatkan pipeline Anda gagal diperbarui selama pelaksanaan di masa mendatang.
Untuk detail lengkap skema log peristiwa, lihat Skema log peristiwa Pipeline.
Mengkueri log peristiwa
Nota
Bagian ini menjelaskan perilaku dan sintaks default untuk bekerja dengan log peristiwa untuk alur yang dikonfigurasi dengan Katalog Unity dan mode penerbitan default.
- Untuk perilaku alur Katalog Unity yang menggunakan mode penerbitan lama, lihat Bekerja dengan catatan peristiwa untuk alur mode penerbitan lama Katalog Unity.
- Untuk perilaku dan sintaks alur metastore Hive, lihat Bekerja dengan log peristiwa untuk alur metastore Hive.
Secara default, pipeline menulis log peristiwa ke dalam tabel Delta tersembunyi yang ada di katalog dan skema default yang telah dikonfigurasi untuk pipeline. Meskipun disembunyikan, tabel masih dapat dikueri oleh semua pengguna dengan hak istimewa yang cukup. Secara default, hanya pemilik alur yang dapat mengkueri tabel log peristiwa.
Untuk melakukan kueri log peristiwa sebagai pemilik, gunakan ID pipeline:
SELECT * FROM event_log(<pipelineId>);
Secara default, nama untuk log peristiwa tersembunyi diformat sebagai event_log_{pipeline_id}, di mana ID alur adalah UUID yang ditetapkan sistem dengan tanda hubung digantikan oleh garis bawah. Tabel log peristiwa muncul di system.information_schema.tables tetapi tidak terlihat di Catalog Explorer atau halaman UI ruang kerja lainnya. Anda harus mengaksesnya menggunakan event_log() fungsi .
Anda dapat menerbitkan log peristiwa dengan mengedit pengaturan Tingkat Lanjut untuk alur Anda. Untuk detailnya, lihat Pengaturan pipeline untuk catatan peristiwa. Saat Anda menerbitkan log peristiwa, tentukan nama untuk log peristiwa dan, secara opsional, tentukan katalog dan skema, seperti dalam contoh berikut:
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
Lokasi log peristiwa juga berfungsi sebagai lokasi skema untuk kueri Auto Loader apa pun di alur. Databricks merekomendasikan untuk membuat tampilan di atas tabel log peristiwa sebelum memodifikasi hak istimewa, karena beberapa pengaturan komputasi mungkin memungkinkan pengguna untuk mendapatkan akses ke metadata skema jika tabel log peristiwa dibagikan secara langsung. Contoh sintaks berikut membuat tampilan pada tabel log peristiwa, dan digunakan dalam contoh kueri log peristiwa yang disertakan dalam artikel ini. Ganti <catalog_name>.<schema_name>.<event_log_table_name> dengan nama tabel yang sepenuhnya memenuhi syarat dari log peristiwa alur Anda. Jika Anda telah menerbitkan log peristiwa, gunakan nama yang ditentukan saat menerbitkan. Jika tidak, gunakan event_log(<pipelineId>) di mana pipelineId adalah ID alur yang ingin Anda kueri.
CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;
Di dalam Unity Catalog, tampilan mendukung kueri streaming. Contoh berikut menggunakan Streaming Terstruktur untuk mengkueri tampilan yang ditentukan di atas tabel log peristiwa:
df = spark.readStream.table("event_log_raw")
Contoh kueri dasar
Contoh berikut menunjukkan cara mengkueri log peristiwa untuk mendapatkan informasi umum tentang alur, dan untuk membantu men-debug skenario umum.
Memantau pembaruan pipeline dengan mengkueri pembaruan sebelumnya
Contoh berikut mengkueri pembaruan (atau menjalankan) alur Anda, memperlihatkan ID pembaruan, status, waktu mulai, waktu penyelesaian, dan durasi. Ini memberi Anda gambaran umum tentang jalannya untuk alur kerja.
Asumsikan bahwa Anda telah membuat event_log_raw tampilan untuk alur yang Anda minati, seperti yang dijelaskan dalam Kueri log peristiwa.
with last_status_per_update AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
timestamp,
ROW_NUMBER() OVER (
PARTITION BY origin.update_id
ORDER BY timestamp DESC
) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
-- Capture the start of the update
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
-- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
COALESCE(
MAX(CASE
WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
THEN timestamp
END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update', 'update_progress')
AND origin.update_id IS NOT NULL
GROUP BY pipeline_id, pipeline_name, pipeline_update_id
HAVING start_time IS NOT NULL
)
SELECT
s.pipeline_id,
s.pipeline_name,
s.pipeline_update_id,
d.start_time,
d.end_time,
CASE
WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
ELSE NULL
END AS duration_seconds,
s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
ON s.pipeline_id = d.pipeline_id
AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;
Mengatasi masalah penyegaran inkremental pada contoh tampilan
Contoh ini mengkueri semua alur dari pembaruan terbaru pipa data. Ini menunjukkan apakah informasi tersebut diperbarui secara bertahap atau tidak, serta informasi perencanaan relevan lainnya yang berguna untuk penelusuran kesalahan mengapa refresh bertahap tidak terjadi.
Asumsikan bahwa Anda telah membuat event_log_raw tampilan untuk alur yang Anda minati, seperti yang dijelaskan dalam Kueri log peristiwa.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
-- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
SELECT
origin.pipeline_name,
origin.pipeline_id,
origin.flow_name,
lu.latest_update_id,
from_json(
details:planning_information,
'struct<
technique_information: array<struct<
maintenance_type: string,
is_chosen: boolean,
is_applicable: boolean,
cost: double,
incrementalization_issues: array<struct<
issue_type: string,
prevent_incrementalization: boolean,
operator_name: string,
plan_not_incrementalizable_sub_type: string,
expression_name: string,
plan_not_deterministic_sub_type: string
>>
>>
>'
) AS parsed
FROM event_log_raw AS origin
JOIN latest_update lu
ON origin.update_id = lu.latest_update_id
WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
parsed.technique_information AS planning_information
FROM parsed_planning
)
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
chosen_technique.maintenance_type,
chosen_technique,
planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;
Memeriksa biaya pembaruan pipa
Contoh ini menunjukkan cara mengkueri penggunaan DBU untuk alur, serta pengguna untuk pelaksanaan alur tertentu.
SELECT
sku_name,
billing_origin_product,
usage_date,
collect_set(identity_metadata.run_as) as users,
SUM(usage_quantity) AS `DBUs`
FROM
system.billing.usage
WHERE
usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
ALL;
Kueri tingkat lanjut
Contoh berikut menunjukkan cara mengkueri log peristiwa untuk menangani skenario yang kurang umum atau lebih lanjut.
Metrik kueri untuk semua aliran dalam rangkaian proses
Contoh ini memperlihatkan cara mengkueri informasi terperinci tentang setiap proses dalam pipeline. Ini menunjukkan nama alur, durasi pembaruan, metrik kualitas data, dan informasi tentang baris yang diproses (baris output, dihapus, di-upsert, dan rekaman yang dibuang).
Asumsikan bahwa Anda telah membuat event_log_raw tampilan untuk alur yang Anda minati, seperti yang dijelaskan dalam Kueri log peristiwa.
WITH flow_progress_raw AS (
SELECT
origin.pipeline_name AS pipeline_name,
origin.pipeline_id AS pipeline_id,
origin.flow_name AS table_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress.status AS status,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS num_output_rows,
TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT) AS num_upserted_rows,
TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT) AS num_deleted_rows,
TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
FROM_JSON(
details:flow_progress.data_quality.expectations,
SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
) AS expectations_array
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND origin.flow_name IS NOT NULL
AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),
aggregated_flows AS (
SELECT
pipeline_name,
pipeline_id,
update_id,
table_name,
MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
MAX_BY(status, timestamp) FILTER (
WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
) AS final_status,
SUM(COALESCE(num_output_rows, 0)) AS total_output_records,
SUM(COALESCE(num_upserted_rows, 0)) AS total_upserted_records,
SUM(COALESCE(num_deleted_rows, 0)) AS total_deleted_records,
MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
MAX(expectations_array) AS total_expectations
FROM flow_progress_raw
GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
af.pipeline_name,
af.pipeline_id,
af.update_id,
af.table_name,
af.start_timestamp,
af.end_timestamp,
af.final_status,
CASE
WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
ELSE NULL
END AS duration_seconds,
af.total_output_records,
af.total_upserted_records,
af.total_deleted_records,
af.total_expectation_dropped_records,
af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
SELECT update_id
FROM aggregated_flows
ORDER BY end_timestamp DESC
LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;
Kualitas data kueri atau metrik ekspektasi
Jika Anda menentukan ekspektasi pada himpunan data di alur Anda, metrik untuk jumlah rekaman yang lolos dan gagal harapan disimpan dalam details:flow_progress.data_quality.expectations objek. Metrik untuk jumlah rekaman yang dihilangkan disimpan dalam details:flow_progress.data_quality objek. Peristiwa yang berisi informasi tentang kualitas data memiliki jenis peristiwa flow_progress.
Metrik kualitas data mungkin tidak tersedia untuk beberapa himpunan data. Lihat batasan ekspektasi.
Metrik kualitas data berikut tersedia:
| Ukuran | Description |
|---|---|
dropped_records |
Jumlah rekaman yang dihapus karena gagal memenuhi satu atau beberapa harapan. |
passed_records |
Jumlah rekaman yang melewati kriteria ekspektasi. |
failed_records |
Jumlah rekaman yang gagal dalam kriteria ekspektasi. |
Contoh berikut mengkueri metrik kualitas data untuk pembaruan alur terakhir. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details:flow_progress:data_quality:expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name;
Informasi riwayat kueri
Peristiwa yang berisi informasi tentang garis keturunan memiliki jenis peristiwa flow_definition. Objek details:flow_definition berisi output_dataset dan input_datasets menentukan setiap hubungan dalam grafik.
Gunakan kueri berikut untuk mengekstrak himpunan data input dan output untuk melihat informasi silsilah data. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
details:flow_definition.output_dataset as flow_name,
details:flow_definition.input_datasets as input_flow_names,
details:flow_definition.flow_type as flow_type,
details:flow_definition.schema, -- the schema of the flow
details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;
Memantau penyerapan file cloud dengan Auto Loader
Alur menghasilkan peristiwa saat Auto Loader memproses file. Untuk peristiwa Auto Loader, event_type adalah operation_progress dan details:operation_progress:type adalah AUTO_LOADER_LISTING atau AUTO_LOADER_BACKFILL. Objek details:operation_progress juga menyertakan bidang status, duration_ms, auto_loader_details:source_path, dan auto_loader_details:num_files_listed.
Contoh berikut mengkueri peristiwa Auto Loader untuk mendapatkan pembaruan terbaru. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest_update.id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');
Memantau backlog data untuk mengoptimalkan durasi streaming
Setiap alur melacak berapa banyak data yang ada di backlog dalam details:flow_progress.metrics.backlog_bytes objek. Peristiwa yang berisi metrik backlog memiliki jenis peristiwa flow_progress. Contoh berikut mengkueri metrik backlog untuk pembaruan alur terakhir. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id;
Nota
Metrik backlog mungkin tidak tersedia tergantung pada jenis sumber data alur dan versi Databricks Runtime.
Memantau peristiwa penskalaan otomatis untuk mengoptimalkan komputasi klasik
Untuk alur yang menggunakan komputasi klasik (dengan kata lain, jangan gunakan komputasi tanpa server), log peristiwa menangkap perubahan ukuran kluster saat penskalaan otomatis yang ditingkatkan diaktifkan di alur Anda. Peristiwa yang berisi informasi tentang penskalakan otomatis yang ditingkatkan memiliki jenis peristiwa autoscale. Informasi permintaan mengubah ukuran kluster disimpan dalam objek details:autoscale.
Contoh berikut mengkueri permintaan pengubahan ukuran kluster penskalaan otomatis yang disempurnakan untuk pembaruan alur terakhir. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
Memantau pemanfaatan sumber daya komputasi untuk komputasi klasik
cluster_resources peristiwa menyediakan metrik pada jumlah slot tugas dalam kluster, berapa banyak slot tugas tersebut yang digunakan, dan berapa banyak tugas yang menunggu untuk dijadwalkan.
Saat penskalaan otomatis yang ditingkatkan diaktifkan, peristiwa cluster_resources juga berisi metrik untuk algoritma penskalaan otomatis, termasuk latest_requested_num_executors, dan optimal_num_executors. Peristiwa juga menunjukkan status algoritma sebagai status yang berbeda seperti CLUSTER_AT_DESIRED_SIZE, , SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSdan BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION.
Informasi ini dapat dilihat bersama dengan peristiwa penskalaan otomatis untuk memberikan gambaran keseluruhan penskalaan otomatis yang ditingkatkan.
Contoh berikut mengkueri riwayat ukuran antrean tugas, riwayat pemanfaatan, riwayat jumlah eksekutor, dan metrik dan status lainnya untuk penskalaan otomatis dalam pembaruan alur terakhir. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
Double(details:cluster_resources.num_executors) as current_executors,
Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id;
Memantau metrik streaming jalur
Anda dapat melihat metrik tentang kemajuan aliran dalam alur. Query untuk peristiwa stream_progress untuk memperoleh kejadian yang sangat mirip dengan metrik StreamingQueryListener yang dihasilkan oleh Structured Streaming, dengan pengecualian berikut:
- Metrik berikut ada di
StreamingQueryListener, tetapi tidak dalamstream_progress:numInputRows,inputRowsPerSecond, danprocessedRowsPerSecond. - Untuk aliran Kafka dan Kinesis, kolom
startOffset,endOffset, danlatestOffsetmungkin terlalu besar, dan terpotong. Untuk setiap bidang ini, bidang tambahan...Truncated,startOffsetTruncated,endOffsetTruncated, danlatestOffsetTruncatedditambahkan dengan nilai Boolean yang menunjukkan apakah data dipotong.
Untuk melakukan pencarian peristiwa stream_progress, Anda bisa menggunakan kueri seperti yang berikut:
SELECT
parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';
Berikut adalah contoh peristiwa, di JSON:
{
"id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
"sequence": {
"control_plane_seq_no": 1234567890123456
},
"origin": {
"cloud": "<cloud>",
"region": "<region>",
"org_id": 0123456789012345,
"pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"pipeline_type": "WORKSPACE",
"pipeline_name": "<pipeline name>",
"update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
},
"timestamp": "2025-06-17T03:18:14.018Z",
"message": "Completed a streaming update of 'flow_name'."
"level": "INFO",
"details": {
"stream_progress": {
"progress": {
"id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"name": "silverTransformFromBronze",
"timestamp": "2022-11-01T18:21:29.500Z",
"batchId": 4,
"durationMs": {
"latestOffset": 62,
"triggerExecution": 62
},
"stateOperators": [],
"sources": [
{
"description": "DeltaSource[dbfs:/path/to/table]",
"startOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"endOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"latestOffset": null,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
],
"sink": {
"description": "DeltaSink[dbfs:/path/to/sink]",
"numOutputRows": -1
}
}
}
},
"event_type": "stream_progress",
"maturity_level": "EVOLVING"
}
Contoh ini memperlihatkan rekaman yang tidak terpotong dalam sumber Kafka, dengan bidang yang diatur ...Truncated ke false.
{
"description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffsetTruncated": false,
"startOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706380
}
},
"endOffsetTruncated": false,
"endOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"latestOffsetTruncated": false,
"latestOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"numInputRows": 292,
"inputRowsPerSecond": 13.65826278123392,
"processedRowsPerSecond": 14.479817514628582,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
}
Mengaudit alur
Anda dapat menggunakan catatan log peristiwa dan log audit Azure Databricks lainnya untuk mendapatkan gambaran lengkap tentang bagaimana data diperbarui dalam alur.
Alur Deklaratif Lakeflow Spark menggunakan kredensial pemilik alur untuk menjalankan pembaruan. Anda dapat mengubah kredensial yang digunakan dengan memperbarui pemilik pipeline. Log audit merekam pengguna yang melakukan tindakan pada alur kerja, termasuk pembuatan alur, pengeditan konfigurasi, dan memicu pembaruan pada alur kerja.
Lihat acara Katalog Unity untuk rujukan acara audit Katalog Unity.
Mengkueri tindakan pengguna di log peristiwa
Anda dapat menggunakan log peristiwa untuk mengaudit peristiwa, misalnya, tindakan pengguna. Peristiwa yang berisi informasi tentang tindakan pengguna memiliki jenis peristiwa user_action.
Informasi tentang tindakan disimpan di objek user_action di bidang details. Gunakan kueri berikut untuk membuat log audit peristiwa pengguna. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
|---|---|---|
| 2021-05-20T19:36:03.517+0000 | START |
user@company.com |
| 2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
| 2021-05-27T00:35:51.971+0000 | START |
user@company.com |
Informasi Pemrosesan Sementara
Anda dapat melihat informasi runtime untuk pembaruan pipeline, misalnya, versi Databricks Runtime untuk pembaruan tersebut. Contoh ini mengasumsikan bahwa Anda telah membuat event_log_raw tampilan untuk pipeline yang Anda minati, sebagaimana dijelaskan dalam Query log peristiwa.
SELECT origin.update_id, details:runtime_details:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'runtime_details'
update_id |
dbr_version |
|---|---|
1234abcd-ef56-7890-abcd-ef1234abcd56 |
18,0 |