Sdílet prostřednictvím


Protokol událostí kanálu

Protokol událostí kanálu obsahuje všechny informace související s kanálem, včetně protokolů auditu, kontrol kvality dat, průběhu kanálu a rodokmenu dat. Protokol událostí můžete použít ke sledování, pochopení a monitorování stavu datových kanálů.

Položky protokolu událostí můžete zobrazit v uživatelském rozhraní monitorování kanálu, rozhraní REST API pipelines nebo přímým dotazováním protokolu událostí. Tato část se zaměřuje na přímé dotazování protokolu událostí.

Můžete také definovat vlastní akce, které se mají spustit při protokolování událostí, například odesílání výstrah, pomocí událostních hooků .

Důležité

Neodstraňujte protokol událostí nebo nadřazený katalog nebo schéma, ve kterém je protokol událostí publikovaný. Odstranění protokolu událostí může způsobit, že se pipeline během budoucích spuštění neaktualizuje.

Úplné podrobnosti o schématu protokolu událostí najdete v tématu Pipeline schéma protokolu událostí.

Proveďte dotaz na protokol událostí

Poznámka:

Tato část popisuje výchozí chování a syntaxi pro práci s protokoly událostí pro kanály nakonfigurované s katalogem Unity a výchozím režimem publikování.

Ve výchozím nastavení kanál zapíše protokol událostí do skryté tabulky Delta ve výchozím katalogu a schématu nakonfigurovaného pro kanál. I když je tabulka skrytá, může se na tuto tabulku dotazovat všichni dostatečně privilegovaní uživatelé. Ve výchozím nastavení může tabulku protokolu událostí dotazovat pouze vlastník kanálu.

Pokud chcete dotazovat protokol událostí jako vlastníka, použijte ID kanálu:

SELECT * FROM event_log(<pipelineId>);

Ve výchozím nastavení je název skrytého protokolu událostí formátovaný jako event_log_{pipeline_id}, kde ID kanálu je UUID přiřazené systémem s pomlčkami nahrazenými podtržítky.

Protokol událostí můžete publikovat úpravou pokročilých nastavení vaší pipeliny. Podrobnosti najdete v nastavení pipeline pro protokol událostí. Při publikování protokolu událostí zadejte název protokolu událostí a volitelně zadejte katalog a schéma, jako v následujícím příkladu:

{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}

Umístění protokolu událostí slouží také jako umístění schématu pro všechny dotazy nástroje Auto Loader v rámci pracovního procesu. Databricks doporučuje vytvořit zobrazení tabulky protokolu událostí před úpravou oprávnění, protože některá nastavení výpočetních prostředků můžou uživatelům umožnit získat přístup k metadatům schématu, pokud se tabulka protokolu událostí sdílí přímo. Následující příklad syntaxe vytvoří zobrazení v tabulce protokolu událostí a používá se v ukázkových dotazech protokolu událostí zahrnutých v tomto článku. Nahraďte <catalog_name>.<schema_name>.<event_log_table_name> plně kvalifikovaným názvem tabulky protokolu událostí kanálu. Pokud jste publikovali protokol událostí, použijte název zadaný při publikování. V opačném případě použijte event_log(<pipelineId>) , kde id kanálu je ID kanálu, který chcete dotazovat.

CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;

Zobrazení v katalogu Unity podporují streamovací dotazy. Následující příklad používá strukturované streamování k dotazování na zobrazení, které je definováno nad tabulkou protokolu událostí.

df = spark.readStream.table("event_log_raw")

Základní příklady dotazů

Následující příklady ukazují, jak dotazovat protokol událostí, abyste získali obecné informace o kanálech a pomohli ladit běžné scénáře.

Monitorování aktualizací pipeline dotazováním na předchozí aktualizace

Následující příklad se dotazuje na aktualizace (nebo spuštění) kanálu se zobrazeným ID aktualizace, stavem, časem zahájení, časem dokončení a dobou trvání. Tímto získáte přehled o spuštěních pipeline.

Předpokládá, že jste vytvořili event_log_raw zobrazení pro potrubí, které vás zajímá, jak je popsáno v Dotazu na protokol událostí.

with last_status_per_update AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
        timestamp,
        ROW_NUMBER() OVER (
            PARTITION BY origin.update_id
            ORDER BY timestamp DESC
        ) AS rn
    FROM event_log_raw
    WHERE event_type = 'update_progress'
    QUALIFY rn = 1
),
update_durations AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        -- Capture the start of the update
        MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,

        -- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
        COALESCE(
            MAX(CASE
                WHEN event_type = 'update_progress'
                 AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
                THEN timestamp
            END),
            current_timestamp()
        ) AS end_time
    FROM event_log_raw
    WHERE event_type IN ('create_update', 'update_progress')
      AND origin.update_id IS NOT NULL
    GROUP BY pipeline_id, pipeline_name, pipeline_update_id
    HAVING start_time IS NOT NULL
)
SELECT
    s.pipeline_id,
    s.pipeline_name,
    s.pipeline_update_id,
    d.start_time,
    d.end_time,
    CASE
        WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
            ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
        ELSE NULL
    END AS duration_seconds,
    s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
  ON s.pipeline_id = d.pipeline_id
 AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;

Ladění materializovaných problémů s přírůstkovou aktualizací zobrazení

Tento příklad dotazuje všechny toky z nejnovější aktualizace kanálu. Ukazuje, jestli byly aktualizovány přírůstkově či nikoliv, a také další důležité informace týkající se plánování, které jsou užitečné pro ladění, proč se přírůstková aktualizace nestane.

Předpokládá, že jste vytvořili event_log_raw zobrazení pro potrubí, které vás zajímá, jak je popsáno v Dotazu na protokol událostí.

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  -- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
  SELECT
    origin.pipeline_name,
    origin.pipeline_id,
    origin.flow_name,
    lu.latest_update_id,
    from_json(
      details:planning_information,
      'struct<
        technique_information: array<struct<
          maintenance_type: string,
          is_chosen: boolean,
          is_applicable: boolean,
          cost: double,
          incrementalization_issues: array<struct<
            issue_type: string,
            prevent_incrementalization: boolean,
            operator_name: string,
            plan_not_incrementalizable_sub_type: string,
            expression_name: string,
            plan_not_deterministic_sub_type: string
          >>
        >>
      >'
    ) AS parsed
  FROM event_log_raw AS origin
  JOIN latest_update lu
    ON origin.update_id = lu.latest_update_id
  WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
  SELECT
    pipeline_name,
    pipeline_id,
    flow_name,
    latest_update_id,
    FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
    parsed.technique_information AS planning_information
  FROM parsed_planning
)
SELECT
  pipeline_name,
  pipeline_id,
  flow_name,
  latest_update_id,
  chosen_technique.maintenance_type,
  chosen_technique,
  planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;

Zjistit náklady na aktualizaci pipeline

Tento příklad ukazuje, jak zjišťovat využití DBU pro kanál, a také jak zjistit uživatele pro daný běh kanálu.

SELECT
  sku_name,
  billing_origin_product,
  usage_date,
  collect_set(identity_metadata.run_as) as users,
  SUM(usage_quantity) AS `DBUs`
FROM
  system.billing.usage
WHERE
  usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
  ALL;

Dotazy pro pokročilé uživatele

Následující příklady ukazují, jak dotazovat protokol událostí pro zpracování méně běžných nebo pokročilejších scénářů.

Dotazování metrik pro všechny toky v kanálu

Tento příklad ukazuje, jak dotazovat podrobné informace o každém toku v kanálu. Zobrazuje název toku, dobu trvání aktualizace, metriky kvality dat a informace o zpracovávaných řádcích (výstupní řádky, odstraněné, upsertované a vyřazené záznamy).

Předpokládá, že jste vytvořili event_log_raw zobrazení pro potrubí, které vás zajímá, jak je popsáno v Dotazu na protokol událostí.

WITH flow_progress_raw AS (
  SELECT
    origin.pipeline_name         AS pipeline_name,
    origin.pipeline_id           AS pipeline_id,
    origin.flow_name             AS table_name,
    origin.update_id             AS update_id,
    timestamp,
    details:flow_progress.status AS status,
    TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT)      AS num_output_rows,
    TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT)    AS num_upserted_rows,
    TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT)     AS num_deleted_rows,
    TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
    FROM_JSON(
      details:flow_progress.data_quality.expectations,
      SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
    ) AS expectations_array

  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND origin.flow_name IS NOT NULL
    AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),

aggregated_flows AS (
  SELECT
    pipeline_name,
    pipeline_id,
    update_id,
    table_name,
    MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
    MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
    MAX_BY(status, timestamp) FILTER (
      WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
    ) AS final_status,
    SUM(COALESCE(num_output_rows, 0))              AS total_output_records,
    SUM(COALESCE(num_upserted_rows, 0))            AS total_upserted_records,
    SUM(COALESCE(num_deleted_rows, 0))             AS total_deleted_records,
    MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
    MAX(expectations_array)                        AS total_expectations

  FROM flow_progress_raw
  GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
  af.pipeline_name,
  af.pipeline_id,
  af.update_id,
  af.table_name,
  af.start_timestamp,
  af.end_timestamp,
  af.final_status,
  CASE
    WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
      ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
    ELSE NULL
  END AS duration_seconds,

  af.total_output_records,
  af.total_upserted_records,
  af.total_deleted_records,
  af.total_expectation_dropped_records,
  af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
  SELECT update_id
  FROM aggregated_flows
  ORDER BY end_timestamp DESC
  LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;

Dotaz metrik kvality a očekávání dat

Pokud definujete očekávání u datových sad ve vašem kanálu, metriky pro počet záznamů, které očekávání splnily nebo nesplnily, se uloží v details:flow_progress.data_quality.expectations objektu. Metrika pro počet vynechaných záznamů je uložena v objektu details:flow_progress.data_quality . Události obsahující informace o kvalitě dat mají typ události flow_progress.

U některých datových sad nemusí být k dispozici metriky kvality dat. Podívejte se na omezení očekávání.

K dispozici jsou následující metriky kvality dat:

Ukazatel Description
dropped_records Počet záznamů, které byly vyřazeny, protože nesplnily jedno nebo více kritérií.
passed_records Počet záznamů, které prošly očekávanými kritérii.
failed_records Počet záznamů, které nesplnily očekávaná kritéria.

Následující příklad dotazuje metriky kvality dat pro poslední aktualizaci pipeline. Předpokládá se, že jste vytvořili event_log_raw zobrazení pro pipeline, která vás zajímá, jak je popsáno v Práce s protokolem událostí.

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details:flow_progress:data_quality:expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name;

Informace o linii dotazů

Události obsahující informace o rodokmenu mají typ události flow_definition. Objekt details:flow_definition obsahuje output_dataset a input_datasets definující jednotlivé relace v grafu.

Pomocí následujícího dotazu extrahujte vstupní a výstupní datové sady a zobrazte informace o rodokmenu. Předpokládá se, že jste vytvořili event_log_raw zobrazení pro pipeline, která vás zajímá, jak je popsáno v Práce s protokolem událostí.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  details:flow_definition.output_dataset as flow_name,
  details:flow_definition.input_datasets as input_flow_names,
  details:flow_definition.flow_type as flow_type,
  details:flow_definition.schema, -- the schema of the flow
  details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;

Monitorování ingestování cloudových souborů pomocí Auto Loaderu

Vývody generují události při zpracování souborů Auto Loaderem. U událostí automatického zavaděče je event_typeoperation_progress a details:operation_progress:type je buď AUTO_LOADER_LISTING, nebo AUTO_LOADER_BACKFILL. Objekt details:operation_progress obsahuje také pole status, duration_ms, auto_loader_details:source_patha auto_loader_details:num_files_listed.

Následující příklad dotazuje události Auto Loaderu pro poslední aktualizaci. Předpokládá se, že jste vytvořili event_log_raw zobrazení pro pipeline, která vás zajímá, jak je popsáno v Práce s protokolem událostí.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest_update.id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');

Monitorování backlogu dat pro optimalizaci doby trvání streamování

Každý kanál sleduje, kolik dat se nachází v backlogu objektu details:flow_progress.metrics.backlog_bytes . Události obsahující metriky backlogu mají typ události flow_progress. Následující příklad se dotazuje na metriky nedokončených úkolů pro poslední aktualizaci pipeline. Předpokládá se, že jste vytvořili event_log_raw zobrazení pro pipeline, která vás zajímá, jak je popsáno v Práce s protokolem událostí.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id;

Poznámka:

Metriky backlogu nemusí být dostupné v závislosti na typu zdroje dat datového kanálu a verzi Databricks Runtime.

Monitorování událostí automatického škálování pro optimalizaci klasických výpočetních prostředků

U kanálů, které používají klasické výpočetní prostředky (jinými slovy nepoužívejte bezserverové výpočetní prostředky), zachytává protokol událostí změnu velikosti clusteru, když je ve vašich kanálech povolené rozšířené automatické škálování. Události obsahující informace o rozšířeném automatickém škálování mají typ události autoscale. Informace o změně velikosti žádosti clusteru jsou uloženy v objektu details:autoscale.

Následující příklad se dotazuje na požadavky na změnu velikosti clusteru s vylepšeným automatickým škálováním pro poslední aktualizaci datového kanálu. Předpokládá se, že jste vytvořili event_log_raw zobrazení pro pipeline, která vás zajímá, jak je popsáno v Práce s protokolem událostí.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Monitorování využití výpočetních prostředků pro klasické výpočetní prostředky

cluster_resources události poskytují metriky o počtu slotů úloh v clusteru, o tom, kolik těchto slotů úloh se používá a kolik úkolů čeká na naplánování.

Pokud je povolené rozšířené automatické škálování, cluster_resources události také obsahují metriky pro algoritmus automatického škálování, včetně latest_requested_num_executorsa optimal_num_executors. Události také zobrazují stav algoritmu jako různé stavy, jako jsou CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSa BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Tyto informace je možné zobrazit ve spojení s událostmi automatického škálování a poskytnout tak celkový přehled o vylepšeném automatickém škálování.

Následující příklad dotazuje historii velikosti fronty úloh, historii využití, historii počtu vykonavatelů a další metriky a stavy pro účely automatického škálování v rámci poslední aktualizace kanálu. Předpokládá se, že jste vytvořili event_log_raw zobrazení pro pipeline, která vás zajímá, jak je popsáno v Práce s protokolem událostí.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
  Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
  Double(details:cluster_resources.num_executors) as current_executors,
  Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id;

Monitorování metrik streamování kanálu

Metriky o průběhu streamu můžete zobrazit v pipeline. Dotaz na události stream_progress pro získání událostí velmi podobných metrikám StreamingQueryListener vytvořeným strukturovaným streamováním s následujícími výjimkami:

  • Následující metriky jsou přítomny v , ale ne v StreamingQueryListenerstream_progress: numInputRows, inputRowsPerSeconda processedRowsPerSecond.
  • U datových proudů Kafka a Kineses mohou být pole startOffset, endOffset a latestOffset příliš velká, a proto jsou zkrácena. Pro každé z těchto polí se přidá další ...Truncated pole startOffsetTruncated, endOffsetTruncateda latestOffsetTruncated, s logickou hodnotou pro to, zda jsou data zkrácena.

K dotazování na stream_progress události můžete použít například následující dotaz:

SELECT
  parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';

Tady je příklad události ve formátu JSON:

{
  "id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
  "sequence": {
    "control_plane_seq_no": 1234567890123456
  },
  "origin": {
    "cloud": "<cloud>",
    "region": "<region>",
    "org_id": 0123456789012345,
    "pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
    "pipeline_type": "WORKSPACE",
    "pipeline_name": "<pipeline name>",
    "update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
    "request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
  },
  "timestamp": "2025-06-17T03:18:14.018Z",
  "message": "Completed a streaming update of 'flow_name'."
  "level": "INFO",
  "details": {
    "stream_progress": {
      "progress": {
        "id": "abcdef12-abcd-3456-7890-abcd1234ef56",
        "runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
        "name": "silverTransformFromBronze",
        "timestamp": "2022-11-01T18:21:29.500Z",
        "batchId": 4,
        "durationMs": {
          "latestOffset": 62,
          "triggerExecution": 62
        },
        "stateOperators": [],
        "sources": [
          {
            "description": "DeltaSource[dbfs:/path/to/table]",
            "startOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "endOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "latestOffset": null,
            "metrics": {
              "numBytesOutstanding": "0",
              "numFilesOutstanding": "0"
            }
          }
        ],
        "sink": {
          "description": "DeltaSink[dbfs:/path/to/sink]",
          "numOutputRows": -1
        }
      }
    }
  },
  "event_type": "stream_progress",
  "maturity_level": "EVOLVING"
}

Tento příklad ukazuje nerušené záznamy ve zdroji Kafka s poli nastavenými ...Truncated na false:

{
  "description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
  "startOffsetTruncated": false,
  "startOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706380
    }
  },
  "endOffsetTruncated": false,
  "endOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "latestOffsetTruncated": false,
  "latestOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "numInputRows": 292,
  "inputRowsPerSecond": 13.65826278123392,
  "processedRowsPerSecond": 14.479817514628582,
  "metrics": {
    "avgOffsetsBehindLatest": "0.0",
    "estimatedTotalBytesBehindLatest": "0.0",
    "maxOffsetsBehindLatest": "0",
    "minOffsetsBehindLatest": "0"
  }
}

Auditování datových cest

Pomocí záznamů protokolu událostí a dalších protokolů auditu Azure Databricks můžete získat úplný přehled o tom, jak se data aktualizují v kanálu.

Deklarativní kanály Sparku lakeflow používají ke spuštění aktualizací přihlašovací údaje vlastníka kanálu. Přihlašovací údaje můžete změnit tím, že aktualizujete vlastníka pipeline. Protokol auditu zaznamenává uživatele pro akce v datovém toku, včetně vytvoření datového toku, úprav konfigurace a spouštění aktualizací.

Referenční informace o událostech auditování katalogu Unity najdete v tématu události katalogu Unity.

Dotazování akcí uživatele v protokolu událostí

Protokol událostí můžete použít k auditování událostí, například akcí uživatelů. Události obsahující informace o akcích uživatele mají typ události user_action.

Informace o akci jsou uloženy v objektu user_action v poli details. Pomocí následujícího dotazu vytvořte protokol auditu uživatelských událostí. Předpokládá se, že jste vytvořili event_log_raw zobrazení pro pipeline, která vás zajímá, jak je popsáno v Práce s protokolem událostí.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

informace o modulu runtime

Můžete zobrazit informace o modulu runtime pro aktualizaci pipeline, například verzi Databricks Runtime pro tuto aktualizaci. Předpokládá se, že jste vytvořili event_log_raw zobrazení pro pipeline, která vás zajímá, jak je popsáno v Dotaz na protokol událostí.

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11.0