管線事件記錄檔

管線事件記錄檔包含與管線相關的所有資訊,包括稽核記錄、資料品質檢查、管線進度和資料譜系。 您可以使用事件記錄檔來追蹤、了解及監視資料管線的狀態。

您可以在管線監視 使用者介面管線 REST API 中檢視事件記錄項目,或直接查詢事件記錄檔。 本節著重於直接查詢事件記錄檔。

您也可以定義自訂動作,例如傳送警示,在記錄事件時執行,使用 事件鉤子

這很重要

請勿刪除事件記錄檔或發佈事件記錄檔的父目錄或架構。 刪除事件記錄檔可能會導致管線在未來執行期間無法更新。

如需事件記錄檔結構描述的完整詳細資料,請參閱 管線事件記錄檔結構描述

查詢事件記錄檔

備註

本節說明針對使用 Unity Catalog 配置的管線及其預設發佈模式,如何處理事件記錄檔的預設行為和語法。

根據預設,管線會將事件記錄寫入為管線設定的預設目錄和 Schema 中的隱藏 Delta 資料表。 隱藏時,所有具許可權的使用者仍然可以查詢數據表。 根據預設,只有管線的擁有者可以查詢事件記錄數據表。

若要以擁有者身分查詢事件記錄檔,請使用管線標識碼:

SELECT * FROM event_log(<pipelineId>);

根據預設,隱藏的事件記錄檔名稱會被格式化為 event_log_{pipeline_id},其中管線 ID 是系統指派的 UUID,破折號已被替換為底線。 事件日誌表出現在 system.information_schema.tables,但在目錄檔案總管或其他工作區介面頁面中不可見。 你必須使用這個 event_log() 函式來存取它。

您可以編輯管線的 進階設定 來發佈事件記錄檔。 如需詳細資訊,請參閱 事件記錄檔的管線設定。 當您發佈事件記錄檔時,請指定事件記錄檔的名稱,並選擇性地指定目錄和結構描述,如下列範例所示:

{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}

事件日誌位置也作為管線中任何 Auto Loader 查詢的架構位置。 Databricks 建議在修改許可權之前先建立事件記錄數據表的檢視,因為某些計算設定可能會允許使用者直接共用事件記錄檔數據表時存取架構元數據。 下列範例語法會在事件記錄數據表上建立檢視,並用於本文中包含的範例事件記錄查詢中。 將 <catalog_name>.<schema_name>.<event_log_table_name> 替換為管線事件日誌的完整表名稱。 如果您已發佈事件記錄檔,請使用發佈時指定的名稱。 否則,請使用 event_log(<pipelineId>),其中 pipelineId 是您想要查詢的管線的識別碼。

CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;

在 Unity Catalog 中,視圖支援串流查詢。 下列範例會使用結構化串流來查詢事件記錄資料表上方定義的檢視:

df = spark.readStream.table("event_log_raw")

基本查詢範例

下列範例示範如何查詢事件記錄檔以取得管線的一般資訊,以及協助偵錯常見案例。

藉由查詢先前的更新來監視管線更新

下列範例會查詢管線的更新(或 執行),其中顯示更新標識碼、狀態、開始時間、完成時間和持續時間。 這可讓您概觀管線的執行。

假設您已為感興趣的管線建立 event_log_raw 檢視,如 查詢事件記錄檔中所述。

with last_status_per_update AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
        timestamp,
        ROW_NUMBER() OVER (
            PARTITION BY origin.update_id
            ORDER BY timestamp DESC
        ) AS rn
    FROM event_log_raw
    WHERE event_type = 'update_progress'
    QUALIFY rn = 1
),
update_durations AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        -- Capture the start of the update
        MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,

        -- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
        COALESCE(
            MAX(CASE
                WHEN event_type = 'update_progress'
                 AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
                THEN timestamp
            END),
            current_timestamp()
        ) AS end_time
    FROM event_log_raw
    WHERE event_type IN ('create_update', 'update_progress')
      AND origin.update_id IS NOT NULL
    GROUP BY pipeline_id, pipeline_name, pipeline_update_id
    HAVING start_time IS NOT NULL
)
SELECT
    s.pipeline_id,
    s.pipeline_name,
    s.pipeline_update_id,
    d.start_time,
    d.end_time,
    CASE
        WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
            ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
        ELSE NULL
    END AS duration_seconds,
    s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
  ON s.pipeline_id = d.pipeline_id
 AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;

偵錯具體化檢視累加式重新整理問題

此範例會從管線的最新更新查詢所有流程。 它會顯示它們是否逐步更新,以及其他與規劃相關的資訊,這些資訊有助於偵錯增量刷新失敗的原因。

假設您已為感興趣的管線建立 event_log_raw 檢視,如 查詢事件記錄檔中所述。

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  -- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
  SELECT
    origin.pipeline_name,
    origin.pipeline_id,
    origin.flow_name,
    lu.latest_update_id,
    from_json(
      details:planning_information,
      'struct<
        technique_information: array<struct<
          maintenance_type: string,
          is_chosen: boolean,
          is_applicable: boolean,
          cost: double,
          incrementalization_issues: array<struct<
            issue_type: string,
            prevent_incrementalization: boolean,
            operator_name: string,
            plan_not_incrementalizable_sub_type: string,
            expression_name: string,
            plan_not_deterministic_sub_type: string
          >>
        >>
      >'
    ) AS parsed
  FROM event_log_raw AS origin
  JOIN latest_update lu
    ON origin.update_id = lu.latest_update_id
  WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
  SELECT
    pipeline_name,
    pipeline_id,
    flow_name,
    latest_update_id,
    FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
    parsed.technique_information AS planning_information
  FROM parsed_planning
)
SELECT
  pipeline_name,
  pipeline_id,
  flow_name,
  latest_update_id,
  chosen_technique.maintenance_type,
  chosen_technique,
  planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;

查詢管線更新的成本

此範例示範如何查詢管線的 DBU 使用量,以及指定管線執行的使用者。

SELECT
  sku_name,
  billing_origin_product,
  usage_date,
  collect_set(identity_metadata.run_as) as users,
  SUM(usage_quantity) AS `DBUs`
FROM
  system.billing.usage
WHERE
  usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
  ALL;

進階查詢

下列範例示範如何查詢事件記錄檔,以處理較不常見或更進階的案例。

查詢管線中所有流程的度量表

此範例示範如何查詢管線中每個流程的詳細資訊。 它會顯示流程名稱、更新持續時間、數據品質計量,以及已處理之數據列的相關信息(輸出數據列、已刪除、更新插入和卸載的記錄)。

假設您已為感興趣的管線建立 event_log_raw 檢視,如 查詢事件記錄檔中所述。

WITH flow_progress_raw AS (
  SELECT
    origin.pipeline_name         AS pipeline_name,
    origin.pipeline_id           AS pipeline_id,
    origin.flow_name             AS table_name,
    origin.update_id             AS update_id,
    timestamp,
    details:flow_progress.status AS status,
    TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT)      AS num_output_rows,
    TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT)    AS num_upserted_rows,
    TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT)     AS num_deleted_rows,
    TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
    FROM_JSON(
      details:flow_progress.data_quality.expectations,
      SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
    ) AS expectations_array

  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND origin.flow_name IS NOT NULL
    AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),

aggregated_flows AS (
  SELECT
    pipeline_name,
    pipeline_id,
    update_id,
    table_name,
    MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
    MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
    MAX_BY(status, timestamp) FILTER (
      WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
    ) AS final_status,
    SUM(COALESCE(num_output_rows, 0))              AS total_output_records,
    SUM(COALESCE(num_upserted_rows, 0))            AS total_upserted_records,
    SUM(COALESCE(num_deleted_rows, 0))             AS total_deleted_records,
    MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
    MAX(expectations_array)                        AS total_expectations

  FROM flow_progress_raw
  GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
  af.pipeline_name,
  af.pipeline_id,
  af.update_id,
  af.table_name,
  af.start_timestamp,
  af.end_timestamp,
  af.final_status,
  CASE
    WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
      ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
    ELSE NULL
  END AS duration_seconds,

  af.total_output_records,
  af.total_upserted_records,
  af.total_deleted_records,
  af.total_expectation_dropped_records,
  af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
  SELECT update_id
  FROM aggregated_flows
  ORDER BY end_timestamp DESC
  LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;

查詢數據品質或預期計量

如果您在管線中為數據集定義了預期,則滿足和不滿足預期的記錄數的計量將儲存在details:flow_progress.data_quality.expectations物件中。 丟失記錄數目的指標儲存在details:flow_progress.data_quality 物件中。 包含資料品質相關資訊的事件具有事件類型 flow_progress

某些數據集可能無法使用資料品質計量。 請參閱 預期限制

下列資料品質計量可供使用:

計量 Description
dropped_records 因不符合一個或多個預期條件而被捨棄的記錄數量。
passed_records 通過預期準則的記錄數目。
failed_records 不符合預期準則的記錄數目。

下列範例會查詢上次管線更新的數據質量計量。 這假設您已為感興趣的管線建立 event_log_raw 檢視,如 查詢事件記錄檔中所述。

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details:flow_progress:data_quality:expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name;

查詢譜系資訊

包含譜系相關資訊的事件具有事件類型 flow_definitiondetails:flow_definition 物件包含定義圖形中每個關係性的 output_datasetinput_datasets

使用下列查詢來擷取輸入和輸出數據集,以查看譜系資訊。 這假設您已為感興趣的管線建立 event_log_raw 檢視,如 查詢事件記錄檔中所述。

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  details:flow_definition.output_dataset as flow_name,
  details:flow_definition.input_datasets as input_flow_names,
  details:flow_definition.flow_type as flow_type,
  details:flow_definition.schema, -- the schema of the flow
  details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;

使用 Auto Loader 監控雲端檔案的載入

管線會在自動載入器處理檔案時產生事件。 針對自動載入器事件,event_typeoperation_progress,且 details:operation_progress:typeAUTO_LOADER_LISTINGAUTO_LOADER_BACKFILLdetails:operation_progress 物件也包含 statusduration_msauto_loader_details:source_pathauto_loader_details:num_files_listed 欄位。

下列範例會查詢自動載入器事件以取得最新的更新。 這假設您已為感興趣的管線建立 event_log_raw 檢視,如 查詢事件記錄檔中所述。

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest_update.id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');

監控數據積壓以優化串流時間

每個管線都會追蹤 details:flow_progress.metrics.backlog_bytes 物件中積壓的資料量。 包含待辦項目計量的事件具有事件類型 flow_progress。 下列範例會查詢上次更新時流水線的待辦工作指標。 這假設您已為感興趣的管線建立 event_log_raw 檢視,如 查詢事件記錄檔中所述。

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id;

備註

根據管線的數據來源類型和 Databricks 執行時間版本,積壓指標可能無法取得。

監視自動調整事件以優化傳統計算

對於使用傳統計算的管線(也就是不使用無伺服器計算),當在管線中啟用增強型自動調整時,事件記錄檔會記錄叢集大小的調整。 包含增強式自動調整相關資訊的事件具有事件類型 autoscale。 叢集調整大小要求資訊儲存在 details:autoscale 物件中。

下列範例會查詢上一次管線更新中對增強性自動擴縮的叢集調整大小請求。 這假設您已為感興趣的管線建立 event_log_raw 檢視,如 查詢事件記錄檔中所述。

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

監視傳統計算的計算資源使用率

cluster_resources 事件會針對叢集中的工作位置數目、使用這些工作位置的數目,以及等候排程的工作數目提供計量。

開啟增強型自動調整時, cluster_resources 事件也會包含自動調整演算法的計量,包括 latest_requested_num_executorsoptimal_num_executors。 這些事件也會將演算法的狀態顯示為不同的狀態,例如 CLUSTER_AT_DESIRED_SIZESCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSBLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION。 這項資訊可以與自動調整事件一起檢視,以提供增強式自動調整的整體畫面。

下列範例會查詢工作佇列大小歷程記錄、使用率歷程記錄、執行程式計數歷程記錄,以及其他計量和狀態,以在上次管線更新中自動調整。 這假設您已為感興趣的管線建立 event_log_raw 檢視,如 查詢事件記錄檔中所述。

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
  Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
  Double(details:cluster_resources.num_executors) as current_executors,
  Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id;

監視管線串流指標

您可以檢視管線中串流進度的相關指標。 查詢 stream_progress 事件,以取得與結構化串流所建立的 StreamingQueryListener 計量 非常相似的事件,但下列例外狀況:

  • 下列度量存在於StreamingQueryListener,但不存在於stream_progressnumInputRowsinputRowsPerSecondprocessedRowsPerSecond
  • 對於 Kafka 和 Kineses 串流, startOffsetendOffsetlatestOffset 欄位可能太大,而且會被截斷。 對於這些欄位中的每一個,都會新增一個額外的布林值欄位...TruncatedstartOffsetTruncatedendOffsetTruncatedlatestOffsetTruncated,以指示資料是否被截斷。

若要查詢 stream_progress 事件,您可以使用下列查詢:

SELECT
  parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';

以下是 JSON 中的事件範例:

{
  "id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
  "sequence": {
    "control_plane_seq_no": 1234567890123456
  },
  "origin": {
    "cloud": "<cloud>",
    "region": "<region>",
    "org_id": 0123456789012345,
    "pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
    "pipeline_type": "WORKSPACE",
    "pipeline_name": "<pipeline name>",
    "update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
    "request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
  },
  "timestamp": "2025-06-17T03:18:14.018Z",
  "message": "Completed a streaming update of 'flow_name'."
  "level": "INFO",
  "details": {
    "stream_progress": {
      "progress": {
        "id": "abcdef12-abcd-3456-7890-abcd1234ef56",
        "runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
        "name": "silverTransformFromBronze",
        "timestamp": "2022-11-01T18:21:29.500Z",
        "batchId": 4,
        "durationMs": {
          "latestOffset": 62,
          "triggerExecution": 62
        },
        "stateOperators": [],
        "sources": [
          {
            "description": "DeltaSource[dbfs:/path/to/table]",
            "startOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "endOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "latestOffset": null,
            "metrics": {
              "numBytesOutstanding": "0",
              "numFilesOutstanding": "0"
            }
          }
        ],
        "sink": {
          "description": "DeltaSink[dbfs:/path/to/sink]",
          "numOutputRows": -1
        }
      }
    }
  },
  "event_type": "stream_progress",
  "maturity_level": "EVOLVING"
}

此範例顯示 Kafka 來源中未截斷的記錄,且欄位設定 ...Truncatedfalse

{
  "description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
  "startOffsetTruncated": false,
  "startOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706380
    }
  },
  "endOffsetTruncated": false,
  "endOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "latestOffsetTruncated": false,
  "latestOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "numInputRows": 292,
  "inputRowsPerSecond": 13.65826278123392,
  "processedRowsPerSecond": 14.479817514628582,
  "metrics": {
    "avgOffsetsBehindLatest": "0.0",
    "estimatedTotalBytesBehindLatest": "0.0",
    "maxOffsetsBehindLatest": "0",
    "minOffsetsBehindLatest": "0"
  }
}

審計流程

您可以使用事件記錄記錄和其他 Azure Databricks 稽核記錄,以完整瞭解管線中資料的更新方式。

Lakeflow Spark 宣告式管線會使用管線擁有者的認證來執行更新。 您可以藉由更新管線擁有者來變更所使用的憑證。 稽核記錄會記錄使用者在管線上執行動作,包括管線建立、組態編輯,以及觸發更新。

如需 Unity 目錄稽核事件的參考,請參閱 Unity 目錄事件

查詢事件記錄檔中的使用者動作

您可使用事件記錄檔來稽核事件,例如使用者動作。 包含使用者動作相關資訊的事件具有事件類型 user_action

動作的相關資訊會儲存在 user_action 欄位中的 details 物件中。 使用下列查詢以建構使用者事件的稽核記錄。 這假設您已為感興趣的管線建立 event_log_raw 檢視,如 查詢事件記錄檔中所述。

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

執行階段資訊

你可以查看管線更新的執行時資訊,例如該更新的 Databricks 執行時版本。此範例假設你已建立 event_log_raw 你感興趣管線的檢視,如查詢 事件日誌所述。

SELECT origin.update_id, details:runtime_details:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'runtime_details'
update_id dbr_version
1234abcd-ef56-7890-abcd-ef1234abcd56 18.0