Log peristiwa alur

Log peristiwa alur berisi semua informasi yang terkait dengan alur, termasuk log audit, pemeriksaan kualitas data, kemajuan alur, dan silsilah data. Anda dapat menggunakan log peristiwa untuk melacak, memahami, dan memantau status alur data Anda.

Anda dapat melihat entri log peristiwa di antarmuka pengguna pemantauan alur, Pipelines REST API, atau dengan langsung mengkueri log peristiwa. Bagian ini berfokus pada kueri log peristiwa secara langsung.

Anda juga dapat menentukan tindakan kustom untuk dijalankan saat peristiwa dicatat, misalnya, mengirim pemberitahuan, dengan kait peristiwa.

Penting

Jangan hapus log peristiwa atau katalog induk atau skema tempat log peristiwa diterbitkan. Menghapus log peristiwa dapat mengakibatkan pipeline Anda gagal diperbarui selama pelaksanaan di masa mendatang.

Untuk detail lengkap skema log peristiwa, lihat Skema log peristiwa Pipeline.

Mengkueri log peristiwa

Nota

Bagian ini menjelaskan perilaku dan sintaks default untuk bekerja dengan log peristiwa untuk alur yang dikonfigurasi dengan Katalog Unity dan mode penerbitan default.

Secara default, pipeline menulis log peristiwa ke dalam tabel Delta tersembunyi yang ada di katalog dan skema default yang telah dikonfigurasi untuk pipeline. Meskipun disembunyikan, tabel masih dapat dikueri oleh semua pengguna dengan hak istimewa yang cukup. Secara default, hanya pemilik alur yang dapat mengkueri tabel log peristiwa.

Untuk melakukan kueri log peristiwa sebagai pemilik, gunakan ID pipeline:

SELECT * FROM event_log(<pipelineId>);

Secara default, nama untuk log peristiwa tersembunyi diformat sebagai event_log_{pipeline_id}, di mana ID alur adalah UUID yang ditetapkan sistem dengan tanda hubung digantikan oleh garis bawah. Tabel log peristiwa muncul di system.information_schema.tables tetapi tidak terlihat di Catalog Explorer atau halaman UI ruang kerja lainnya. Anda harus mengaksesnya menggunakan event_log() fungsi .

Anda dapat menerbitkan log peristiwa dengan mengedit pengaturan Tingkat Lanjut untuk alur Anda. Untuk detailnya, lihat Pengaturan pipeline untuk catatan peristiwa. Saat Anda menerbitkan log peristiwa, tentukan nama untuk log peristiwa dan, secara opsional, tentukan katalog dan skema, seperti dalam contoh berikut:

{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}

Lokasi log peristiwa juga berfungsi sebagai lokasi skema untuk kueri Auto Loader apa pun di alur. Databricks merekomendasikan untuk membuat tampilan di atas tabel log peristiwa sebelum memodifikasi hak istimewa, karena beberapa pengaturan komputasi mungkin memungkinkan pengguna untuk mendapatkan akses ke metadata skema jika tabel log peristiwa dibagikan secara langsung. Contoh sintaks berikut membuat tampilan pada tabel log peristiwa, dan digunakan dalam contoh kueri log peristiwa yang disertakan dalam artikel ini. Ganti <catalog_name>.<schema_name>.<event_log_table_name> dengan nama tabel yang sepenuhnya memenuhi syarat dari log peristiwa alur Anda. Jika Anda telah menerbitkan log peristiwa, gunakan nama yang ditentukan saat menerbitkan. Jika tidak, gunakan event_log(<pipelineId>) di mana pipelineId adalah ID alur yang ingin Anda kueri.

CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;

Di dalam Unity Catalog, tampilan mendukung kueri streaming. Contoh berikut menggunakan Streaming Terstruktur untuk mengkueri tampilan yang ditentukan di atas tabel log peristiwa:

df = spark.readStream.table("event_log_raw")

Contoh kueri dasar

Contoh berikut menunjukkan cara mengkueri log peristiwa untuk mendapatkan informasi umum tentang alur, dan untuk membantu men-debug skenario umum.

Memantau pembaruan pipeline dengan mengkueri pembaruan sebelumnya

Contoh berikut mengkueri pembaruan (atau menjalankan) alur Anda, memperlihatkan ID pembaruan, status, waktu mulai, waktu penyelesaian, dan durasi. Ini memberi Anda gambaran umum tentang jalannya untuk alur kerja.

Asumsikan bahwa Anda telah membuat event_log_raw tampilan untuk alur yang Anda minati, seperti yang dijelaskan dalam Kueri log peristiwa.

with last_status_per_update AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
        timestamp,
        ROW_NUMBER() OVER (
            PARTITION BY origin.update_id
            ORDER BY timestamp DESC
        ) AS rn
    FROM event_log_raw
    WHERE event_type = 'update_progress'
    QUALIFY rn = 1
),
update_durations AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        -- Capture the start of the update
        MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,

        -- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
        COALESCE(
            MAX(CASE
                WHEN event_type = 'update_progress'
                 AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
                THEN timestamp
            END),
            current_timestamp()
        ) AS end_time
    FROM event_log_raw
    WHERE event_type IN ('create_update', 'update_progress')
      AND origin.update_id IS NOT NULL
    GROUP BY pipeline_id, pipeline_name, pipeline_update_id
    HAVING start_time IS NOT NULL
)
SELECT
    s.pipeline_id,
    s.pipeline_name,
    s.pipeline_update_id,
    d.start_time,
    d.end_time,
    CASE
        WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
            ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
        ELSE NULL
    END AS duration_seconds,
    s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
  ON s.pipeline_id = d.pipeline_id
 AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;

Mengatasi masalah penyegaran inkremental pada contoh tampilan

Contoh ini mengkueri semua alur dari pembaruan terbaru pipa data. Ini menunjukkan apakah informasi tersebut diperbarui secara bertahap atau tidak, serta informasi perencanaan relevan lainnya yang berguna untuk penelusuran kesalahan mengapa refresh bertahap tidak terjadi.

Asumsikan bahwa Anda telah membuat event_log_raw tampilan untuk alur yang Anda minati, seperti yang dijelaskan dalam Kueri log peristiwa.

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  -- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
  SELECT
    origin.pipeline_name,
    origin.pipeline_id,
    origin.flow_name,
    lu.latest_update_id,
    from_json(
      details:planning_information,
      'struct<
        technique_information: array<struct<
          maintenance_type: string,
          is_chosen: boolean,
          is_applicable: boolean,
          cost: double,
          incrementalization_issues: array<struct<
            issue_type: string,
            prevent_incrementalization: boolean,
            operator_name: string,
            plan_not_incrementalizable_sub_type: string,
            expression_name: string,
            plan_not_deterministic_sub_type: string
          >>
        >>
      >'
    ) AS parsed
  FROM event_log_raw AS origin
  JOIN latest_update lu
    ON origin.update_id = lu.latest_update_id
  WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
  SELECT
    pipeline_name,
    pipeline_id,
    flow_name,
    latest_update_id,
    FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
    parsed.technique_information AS planning_information
  FROM parsed_planning
)
SELECT
  pipeline_name,
  pipeline_id,
  flow_name,
  latest_update_id,
  chosen_technique.maintenance_type,
  chosen_technique,
  planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;

Memeriksa biaya pembaruan pipa

Contoh ini menunjukkan cara mengkueri penggunaan DBU untuk alur, serta pengguna untuk pelaksanaan alur tertentu.

SELECT
  sku_name,
  billing_origin_product,
  usage_date,
  collect_set(identity_metadata.run_as) as users,
  SUM(usage_quantity) AS `DBUs`
FROM
  system.billing.usage
WHERE
  usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
  ALL;

Kueri tingkat lanjut

Contoh berikut menunjukkan cara mengkueri log peristiwa untuk menangani skenario yang kurang umum atau lebih lanjut.

Metrik kueri untuk semua aliran dalam rangkaian proses

Contoh ini memperlihatkan cara mengkueri informasi terperinci tentang setiap proses dalam pipeline. Ini menunjukkan nama alur, durasi pembaruan, metrik kualitas data, dan informasi tentang baris yang diproses (baris output, dihapus, di-upsert, dan rekaman yang dibuang).

Asumsikan bahwa Anda telah membuat event_log_raw tampilan untuk alur yang Anda minati, seperti yang dijelaskan dalam Kueri log peristiwa.

WITH flow_progress_raw AS (
  SELECT
    origin.pipeline_name         AS pipeline_name,
    origin.pipeline_id           AS pipeline_id,
    origin.flow_name             AS table_name,
    origin.update_id             AS update_id,
    timestamp,
    details:flow_progress.status AS status,
    TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT)      AS num_output_rows,
    TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT)    AS num_upserted_rows,
    TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT)     AS num_deleted_rows,
    TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
    FROM_JSON(
      details:flow_progress.data_quality.expectations,
      SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
    ) AS expectations_array

  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND origin.flow_name IS NOT NULL
    AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),

aggregated_flows AS (
  SELECT
    pipeline_name,
    pipeline_id,
    update_id,
    table_name,
    MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
    MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
    MAX_BY(status, timestamp) FILTER (
      WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
    ) AS final_status,
    SUM(COALESCE(num_output_rows, 0))              AS total_output_records,
    SUM(COALESCE(num_upserted_rows, 0))            AS total_upserted_records,
    SUM(COALESCE(num_deleted_rows, 0))             AS total_deleted_records,
    MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
    MAX(expectations_array)                        AS total_expectations

  FROM flow_progress_raw
  GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
  af.pipeline_name,
  af.pipeline_id,
  af.update_id,
  af.table_name,
  af.start_timestamp,
  af.end_timestamp,
  af.final_status,
  CASE
    WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
      ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
    ELSE NULL
  END AS duration_seconds,

  af.total_output_records,
  af.total_upserted_records,
  af.total_deleted_records,
  af.total_expectation_dropped_records,
  af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
  SELECT update_id
  FROM aggregated_flows
  ORDER BY end_timestamp DESC
  LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;

Kualitas data kueri atau metrik ekspektasi

Jika Anda menentukan ekspektasi pada himpunan data di alur Anda, metrik untuk jumlah rekaman yang lolos dan gagal harapan disimpan dalam details:flow_progress.data_quality.expectations objek. Metrik untuk jumlah rekaman yang dihilangkan disimpan dalam details:flow_progress.data_quality objek. Peristiwa yang berisi informasi tentang kualitas data memiliki jenis peristiwa flow_progress.

Metrik kualitas data mungkin tidak tersedia untuk beberapa himpunan data. Lihat batasan ekspektasi.

Metrik kualitas data berikut tersedia:

Ukuran Description
dropped_records Jumlah rekaman yang dihapus karena gagal memenuhi satu atau beberapa harapan.
passed_records Jumlah rekaman yang melewati kriteria ekspektasi.
failed_records Jumlah rekaman yang gagal dalam kriteria ekspektasi.

Contoh berikut mengkueri metrik kualitas data untuk pembaruan alur terakhir. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details:flow_progress:data_quality:expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name;

Informasi riwayat kueri

Peristiwa yang berisi informasi tentang garis keturunan memiliki jenis peristiwa flow_definition. Objek details:flow_definition berisi output_dataset dan input_datasets menentukan setiap hubungan dalam grafik.

Gunakan kueri berikut untuk mengekstrak himpunan data input dan output untuk melihat informasi silsilah data. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  details:flow_definition.output_dataset as flow_name,
  details:flow_definition.input_datasets as input_flow_names,
  details:flow_definition.flow_type as flow_type,
  details:flow_definition.schema, -- the schema of the flow
  details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;

Memantau penyerapan file cloud dengan Auto Loader

Alur menghasilkan peristiwa saat Auto Loader memproses file. Untuk peristiwa Auto Loader, event_type adalah operation_progress dan details:operation_progress:type adalah AUTO_LOADER_LISTING atau AUTO_LOADER_BACKFILL. Objek details:operation_progress juga menyertakan bidang status, duration_ms, auto_loader_details:source_path, dan auto_loader_details:num_files_listed.

Contoh berikut mengkueri peristiwa Auto Loader untuk mendapatkan pembaruan terbaru. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest_update.id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');

Memantau backlog data untuk mengoptimalkan durasi streaming

Setiap alur melacak berapa banyak data yang ada di backlog dalam details:flow_progress.metrics.backlog_bytes objek. Peristiwa yang berisi metrik backlog memiliki jenis peristiwa flow_progress. Contoh berikut mengkueri metrik backlog untuk pembaruan alur terakhir. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id;

Nota

Metrik backlog mungkin tidak tersedia tergantung pada jenis sumber data alur dan versi Databricks Runtime.

Memantau peristiwa penskalaan otomatis untuk mengoptimalkan komputasi klasik

Untuk alur yang menggunakan komputasi klasik (dengan kata lain, jangan gunakan komputasi tanpa server), log peristiwa menangkap perubahan ukuran kluster saat penskalaan otomatis yang ditingkatkan diaktifkan di alur Anda. Peristiwa yang berisi informasi tentang penskalakan otomatis yang ditingkatkan memiliki jenis peristiwa autoscale. Informasi permintaan mengubah ukuran kluster disimpan dalam objek details:autoscale.

Contoh berikut mengkueri permintaan pengubahan ukuran kluster penskalaan otomatis yang disempurnakan untuk pembaruan alur terakhir. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Memantau pemanfaatan sumber daya komputasi untuk komputasi klasik

cluster_resources peristiwa menyediakan metrik pada jumlah slot tugas dalam kluster, berapa banyak slot tugas tersebut yang digunakan, dan berapa banyak tugas yang menunggu untuk dijadwalkan.

Saat penskalaan otomatis yang ditingkatkan diaktifkan, peristiwa cluster_resources juga berisi metrik untuk algoritma penskalaan otomatis, termasuk latest_requested_num_executors, dan optimal_num_executors. Peristiwa juga menunjukkan status algoritma sebagai status yang berbeda seperti CLUSTER_AT_DESIRED_SIZE, , SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSdan BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Informasi ini dapat dilihat bersama dengan peristiwa penskalaan otomatis untuk memberikan gambaran keseluruhan penskalaan otomatis yang ditingkatkan.

Contoh berikut mengkueri riwayat ukuran antrean tugas, riwayat pemanfaatan, riwayat jumlah eksekutor, dan metrik dan status lainnya untuk penskalaan otomatis dalam pembaruan alur terakhir. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
  Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
  Double(details:cluster_resources.num_executors) as current_executors,
  Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id;

Memantau metrik streaming jalur

Anda dapat melihat metrik tentang kemajuan aliran dalam alur. Query untuk peristiwa stream_progress untuk memperoleh kejadian yang sangat mirip dengan metrik StreamingQueryListener yang dihasilkan oleh Structured Streaming, dengan pengecualian berikut:

  • Metrik berikut ada di StreamingQueryListener, tetapi tidak dalam stream_progress: numInputRows, inputRowsPerSecond, dan processedRowsPerSecond.
  • Untuk aliran Kafka dan Kinesis, kolom startOffset, endOffset, dan latestOffset mungkin terlalu besar, dan terpotong. Untuk setiap bidang ini, bidang tambahan ...Truncated, startOffsetTruncated, endOffsetTruncated, dan latestOffsetTruncated ditambahkan dengan nilai Boolean yang menunjukkan apakah data dipotong.

Untuk melakukan pencarian peristiwa stream_progress, Anda bisa menggunakan kueri seperti yang berikut:

SELECT
  parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';

Berikut adalah contoh peristiwa, di JSON:

{
  "id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
  "sequence": {
    "control_plane_seq_no": 1234567890123456
  },
  "origin": {
    "cloud": "<cloud>",
    "region": "<region>",
    "org_id": 0123456789012345,
    "pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
    "pipeline_type": "WORKSPACE",
    "pipeline_name": "<pipeline name>",
    "update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
    "request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
  },
  "timestamp": "2025-06-17T03:18:14.018Z",
  "message": "Completed a streaming update of 'flow_name'."
  "level": "INFO",
  "details": {
    "stream_progress": {
      "progress": {
        "id": "abcdef12-abcd-3456-7890-abcd1234ef56",
        "runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
        "name": "silverTransformFromBronze",
        "timestamp": "2022-11-01T18:21:29.500Z",
        "batchId": 4,
        "durationMs": {
          "latestOffset": 62,
          "triggerExecution": 62
        },
        "stateOperators": [],
        "sources": [
          {
            "description": "DeltaSource[dbfs:/path/to/table]",
            "startOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "endOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "latestOffset": null,
            "metrics": {
              "numBytesOutstanding": "0",
              "numFilesOutstanding": "0"
            }
          }
        ],
        "sink": {
          "description": "DeltaSink[dbfs:/path/to/sink]",
          "numOutputRows": -1
        }
      }
    }
  },
  "event_type": "stream_progress",
  "maturity_level": "EVOLVING"
}

Contoh ini memperlihatkan rekaman yang tidak terpotong dalam sumber Kafka, dengan bidang yang diatur ...Truncated ke false.

{
  "description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
  "startOffsetTruncated": false,
  "startOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706380
    }
  },
  "endOffsetTruncated": false,
  "endOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "latestOffsetTruncated": false,
  "latestOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "numInputRows": 292,
  "inputRowsPerSecond": 13.65826278123392,
  "processedRowsPerSecond": 14.479817514628582,
  "metrics": {
    "avgOffsetsBehindLatest": "0.0",
    "estimatedTotalBytesBehindLatest": "0.0",
    "maxOffsetsBehindLatest": "0",
    "minOffsetsBehindLatest": "0"
  }
}

Mengaudit alur

Anda dapat menggunakan catatan log peristiwa dan log audit Azure Databricks lainnya untuk mendapatkan gambaran lengkap tentang bagaimana data diperbarui dalam alur.

Alur Deklaratif Lakeflow Spark menggunakan kredensial pemilik alur untuk menjalankan pembaruan. Anda dapat mengubah kredensial yang digunakan dengan memperbarui pemilik pipeline. Log audit merekam pengguna yang melakukan tindakan pada alur kerja, termasuk pembuatan alur, pengeditan konfigurasi, dan memicu pembaruan pada alur kerja.

Lihat acara Katalog Unity untuk rujukan acara audit Katalog Unity.

Mengkueri tindakan pengguna di log peristiwa

Anda dapat menggunakan log peristiwa untuk mengaudit peristiwa, misalnya, tindakan pengguna. Peristiwa yang berisi informasi tentang tindakan pengguna memiliki jenis peristiwa user_action.

Informasi tentang tindakan disimpan di objek user_action di bidang details. Gunakan kueri berikut untuk membuat log audit peristiwa pengguna. Ini mengasumsikan bahwa Anda telah membuat tampilan data event_log_raw untuk alur kerja yang Anda minati, sebagaimana dijelaskan dalam Mengkueri log peristiwa.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

Informasi Pemrosesan Sementara

Anda dapat melihat informasi runtime untuk pembaruan pipeline, misalnya, versi Databricks Runtime untuk pembaruan tersebut. Contoh ini mengasumsikan bahwa Anda telah membuat event_log_raw tampilan untuk pipeline yang Anda minati, sebagaimana dijelaskan dalam Query log peristiwa.

SELECT origin.update_id, details:runtime_details:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'runtime_details'
update_id dbr_version
1234abcd-ef56-7890-abcd-ef1234abcd56 18,0