共用方式為


event_log table-valued 函式

適用於:檢查標示為是 Databricks SQL 檢查標示為是 Databricks Runtime 13.3 LTS 和更新版本

傳回具體化檢視串流數據表DLT 管線的事件記錄檔。

深入瞭解 Delta Live Tables 事件記錄檔。

語法

event_log( { TABLE ( table_name ) | pipeline_id } )

引數

  • table_name:具體化檢視或串流數據表的名稱。 名稱不得包含時態規格。 如果名稱不合格,則會使用目前的目錄和架構來限定標識符。
  • pipeline_id:D elta Live Tables 管線的字串標識碼。

傳回

  • id STRING NOT NULL:事件記錄檔記錄的唯一標識符。
  • sequence STRING NOT NULL:JSON 物件,其中包含用來識別和排序事件的元數據。
  • origin STRING NOT NULL:JSON 物件,包含事件來源的元數據,例如雲端提供者、區域、 user_idpipeline_id
  • timestamp TIMESTAMP NOT NULL:事件以UTC記錄的時間。
  • message STRING NOT NULL:描述事件的人類可讀取訊息。
  • level STRING NOT NULL:記錄層級,例如 、INFOWARNERROR、 或 METRICS
  • maturity_level STRING NOT NULL:事件架構的穩定性。 可能的值為:
    • STABLE:架構穩定且不會變更。
    • NULL:架構穩定且不會變更。 如果記錄是在新增欄位之前建立的(2022.37 版)建立,maturity_level則此值可能是 NULL
    • EVOLVING:架構不穩定且可能會變更。
    • DEPRECATED:架構已被取代,Delta Live Tables 運行時間可能隨時停止產生此事件。
  • error STRING:如果發生錯誤,描述錯誤的詳細數據。
  • details STRING NOT NULL:JSON 物件,其中包含事件的結構化詳細數據。 這是用來分析事件的主要欄位。
  • event_type STRING NOT NULL:事件類型。

使用方式

只有管線、串流數據表或具體化檢視的擁有者可以檢視事件記錄檔。 建立檢視,並將檢視的存取權授與使用者,以允許其他使用者查詢事件記錄檔。

> CREATE VIEW event_log_raw AS SELECT * FROM event_log(table(my_mv));
> GRANT SELECT ON VIEW event_log_raw TO `user@databricks.com`;

範例

如需更多範例,請參閱 查詢事件記錄檔。

-- View the events on a materialized view
> SELECT timestamp, message, details
  FROM event_log(table(my_mv))
  WHERE level in ('INFO', 'WARN', 'ERROR')
  ORDER BY timestamp;

timestamp, message, details
---------------------------
2023-08-12 01:03:05.000, 'Flow "my_mv" is STARTING.', '{"flow_progress":{"status":"STARTING"}}'

-- Create a temp view with the latest update to the table/pipeline
> CREATE OR REPLACE TEMP VIEW latest_update AS
  SELECT origin.update_id AS id FROM event_log('<pipeline-ID>')
  WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

-- Query lineage information
> SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log('<pipeline-ID>'),
  latest_update
WHERE
  event_type = 'flow_definition' AND origin.update_id = latest_update.id;

output_dataset, input_dataset
-----------------------------
customers, null
sales_orders_raw, null
sales_orders_cleaned, ["customers", "sales_orders_raw"]
sales_order_in_la, ["sales_orders_cleaned"]

-- Query data quality expectation history for a streaming table
> WITH expectations_parsed AS (
    SELECT
      explode(
        from_json(
          details:flow_progress.data_quality.expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log(table(my_st)),
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
  SELECT
    row_expectations.dataset as dataset,
    row_expectations.name as expectation,
    SUM(row_expectations.passed_records) as passing_records,
    SUM(row_expectations.failed_records) as failing_records
  FROM expectations_parsed
  GROUP BY
    row_expectations.dataset,
    row_expectations.name;

dataset, expectation, passing_records, failing_records
------------------------------------------------------
sales_orders_cleaned, valid_order_number, 4083, 0