Händelselogg för pipeline

Händelseloggen för pipelinen innehåller all information som rör en pipeline, inklusive granskningsloggar, datakvalitetskontroller, pipelineförlopp och data härkomst. Du kan använda händelseloggen för att spåra, förstå och övervaka tillståndet för dina datapipelines.

Du kan visa händelseloggposter i användargränssnittet för pipelineövervakning, REST API för pipelines eller genom att fråga händelseloggen direkt. Det här avsnittet fokuserar på att fråga händelseloggen direkt.

Du kan också definiera anpassade åtgärder som ska köras när händelser loggas, till exempel att skicka aviseringar, med händelsekrokar.

Viktigt!

Ta inte bort händelseloggen eller den överordnade katalogen eller schemat där händelseloggen publiceras. Om du tar bort händelseloggen kan det leda till att pipelinen inte uppdateras under framtida körningar.

För fullständig information om händelseloggschemat, se Pipeline event log schema.

Sök i händelseloggen

Anmärkning

I det här avsnittet beskrivs standardbeteendet och syntaxen för att arbeta med händelseloggar för pipelines som konfigurerats med Unity Catalog och standardpubliceringsläget.

Som standard skriver en pipeline händelseloggen till en dold Delta-tabell i standardkatalogen och schemat som konfigurerats för pipelinen. Även om tabellen är dold kan den fortfarande efterfrågas av alla tillräckligt privilegierade användare. Som standard kan endast ägaren av pipelinen köra frågor mot händelseloggtabellen.

Om du vill köra frågor mot händelseloggen som ägare använder du pipeline-ID:t:

SELECT * FROM event_log(<pipelineId>);

Som standard formateras namnet på den dolda händelseloggen som event_log_{pipeline_id}, där pipeline-ID:t är det systemtilldelade UUID:t med bindestreck ersatta med understreck.

Du kan publicera händelseloggen genom att redigera avancerade inställningar för din pipeline. Mer information finns i Pipelineinställning för händelselogg. När du publicerar en händelselogg anger du namnet på händelseloggen och kan också ange en katalog och ett schema, som i följande exempel:

{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}

Platsen för händelseloggen fungerar också som schemaplats för alla frågor för automatisk inläsning i pipelinen. Databricks rekommenderar att du skapar en vy över händelseloggtabellen innan du ändrar behörigheterna, eftersom vissa beräkningsinställningar kan ge användarna åtkomst till schemametadata om händelseloggtabellen delas direkt. Följande exempelsyntax skapar en vy i en händelseloggtabell och används i exempelhändelseloggfrågorna som ingår i den här artikeln. Ersätt <catalog_name>.<schema_name>.<event_log_table_name> med det fullständigt kvalificerade tabellnamnet för din pipelinehändelselogg. Om du har publicerat händelseloggen använder du det namn som angavs vid publiceringen. Annars använder du event_log(<pipelineId>) där pipelineId är ID för den pipeline som du vill köra frågor mot.

CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;

Inom Unity Catalog stöder vyer strömmande frågor. I följande exempel används Structured Streaming för att fråga en vy som definierats ovanpå en händelseloggtabell:

df = spark.readStream.table("event_log_raw")

Grundläggande frågeexempel

I följande exempel visas hur du frågar efter händelseloggen för att få allmän information om pipelines och för att felsöka vanliga scenarier.

Övervaka pipelineuppdateringar genom att fråga efter tidigare uppdateringar

I följande exempel visas uppdateringarna (eller körningarna) för din pipeline, vilket visar uppdaterings-ID, status, starttid, slutförandetid och varaktighet. Detta ger dig en översikt över körningar för pipelinen.

Förutsätter att du har skapat event_log_raw vyn för pipelinen som du är intresserad av, som beskrivs i Fråga händelseloggen.

with last_status_per_update AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
        timestamp,
        ROW_NUMBER() OVER (
            PARTITION BY origin.update_id
            ORDER BY timestamp DESC
        ) AS rn
    FROM event_log_raw
    WHERE event_type = 'update_progress'
    QUALIFY rn = 1
),
update_durations AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        -- Capture the start of the update
        MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,

        -- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
        COALESCE(
            MAX(CASE
                WHEN event_type = 'update_progress'
                 AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
                THEN timestamp
            END),
            current_timestamp()
        ) AS end_time
    FROM event_log_raw
    WHERE event_type IN ('create_update', 'update_progress')
      AND origin.update_id IS NOT NULL
    GROUP BY pipeline_id, pipeline_name, pipeline_update_id
    HAVING start_time IS NOT NULL
)
SELECT
    s.pipeline_id,
    s.pipeline_name,
    s.pipeline_update_id,
    d.start_time,
    d.end_time,
    CASE
        WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
            ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
        ELSE NULL
    END AS duration_seconds,
    s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
  ON s.pipeline_id = d.pipeline_id
 AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;

Felsöka problem med inkrementell uppdatering av materialiserad vy

Det här exemplet frågar alla flöden från den senaste uppdateringen av en pipeline. Den visar om de har uppdaterats stegvis eller inte, samt annan relevant planeringsinformation som är användbar för felsökning av varför en inkrementell uppdatering inte sker.

Förutsätter att du har skapat event_log_raw vyn för pipelinen som du är intresserad av, som beskrivs i Fråga händelseloggen.

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  -- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
  SELECT
    origin.pipeline_name,
    origin.pipeline_id,
    origin.flow_name,
    lu.latest_update_id,
    from_json(
      details:planning_information,
      'struct<
        technique_information: array<struct<
          maintenance_type: string,
          is_chosen: boolean,
          is_applicable: boolean,
          cost: double,
          incrementalization_issues: array<struct<
            issue_type: string,
            prevent_incrementalization: boolean,
            operator_name: string,
            plan_not_incrementalizable_sub_type: string,
            expression_name: string,
            plan_not_deterministic_sub_type: string
          >>
        >>
      >'
    ) AS parsed
  FROM event_log_raw AS origin
  JOIN latest_update lu
    ON origin.update_id = lu.latest_update_id
  WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
  SELECT
    pipeline_name,
    pipeline_id,
    flow_name,
    latest_update_id,
    FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
    parsed.technique_information AS planning_information
  FROM parsed_planning
)
SELECT
  pipeline_name,
  pipeline_id,
  flow_name,
  latest_update_id,
  chosen_technique.maintenance_type,
  chosen_technique,
  planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;

Fråga efter kostnaden för en pipelineuppdatering

Det här exemplet visar hur du frågar DBU-användningen för en pipeline samt användaren för en viss pipelinekörning.

SELECT
  sku_name,
  billing_origin_product,
  usage_date,
  collect_set(identity_metadata.run_as) as users,
  SUM(usage_quantity) AS `DBUs`
FROM
  system.billing.usage
WHERE
  usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
  ALL;

Avancerade frågor

I följande exempel visas hur du kör frågor mot händelseloggen för att hantera mindre vanliga eller mer avancerade scenarier.

Fråga efter mått för alla flöden i en pipeline

Det här exemplet visar hur du frågar efter detaljerad information om varje flöde i en pipeline. Den visar flödesnamn, uppdateringstid, datakvalitetsmått och information om de bearbetade raderna (utdatarader, borttagna, uppsertade och borttagna poster).

Förutsätter att du har skapat event_log_raw vyn för pipelinen som du är intresserad av, som beskrivs i Fråga händelseloggen.

WITH flow_progress_raw AS (
  SELECT
    origin.pipeline_name         AS pipeline_name,
    origin.pipeline_id           AS pipeline_id,
    origin.flow_name             AS table_name,
    origin.update_id             AS update_id,
    timestamp,
    details:flow_progress.status AS status,
    TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT)      AS num_output_rows,
    TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT)    AS num_upserted_rows,
    TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT)     AS num_deleted_rows,
    TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
    FROM_JSON(
      details:flow_progress.data_quality.expectations,
      SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
    ) AS expectations_array

  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND origin.flow_name IS NOT NULL
    AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),

aggregated_flows AS (
  SELECT
    pipeline_name,
    pipeline_id,
    update_id,
    table_name,
    MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
    MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
    MAX_BY(status, timestamp) FILTER (
      WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
    ) AS final_status,
    SUM(COALESCE(num_output_rows, 0))              AS total_output_records,
    SUM(COALESCE(num_upserted_rows, 0))            AS total_upserted_records,
    SUM(COALESCE(num_deleted_rows, 0))             AS total_deleted_records,
    MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
    MAX(expectations_array)                        AS total_expectations

  FROM flow_progress_raw
  GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
  af.pipeline_name,
  af.pipeline_id,
  af.update_id,
  af.table_name,
  af.start_timestamp,
  af.end_timestamp,
  af.final_status,
  CASE
    WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
      ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
    ELSE NULL
  END AS duration_seconds,

  af.total_output_records,
  af.total_upserted_records,
  af.total_deleted_records,
  af.total_expectation_dropped_records,
  af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
  SELECT update_id
  FROM aggregated_flows
  ORDER BY end_timestamp DESC
  LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;

Fråga datakvalitet eller förväntningsmått

Om du definierar förväntningar på datauppsättningar i pipelinen, lagras måtten för antalet poster som uppfyllde och inte uppfyllde en förväntan i objektet details:flow_progress.data_quality.expectations. Måttet för antalet borttagna poster lagras i objektet details:flow_progress.data_quality . Händelser som innehåller information om datakvalitet har händelsetypen flow_progress.

Datakvalitetsmått kanske inte är tillgängliga för vissa datauppsättningar. Se förväntningsbegränsningarna.

Följande datakvalitetsmått är tillgängliga:

Mätvärde Description
dropped_records Antalet poster som togs bort eftersom de inte uppfyllde ett eller flera krav.
passed_records Antalet poster som passerade förväntanskriterierna.
failed_records Antalet poster som misslyckades med de förväntade kriterierna.

I följande exempel efterfrågas datakvalitetsmåtten för den senaste pipelineuppdateringen. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details:flow_progress:data_quality:expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name;

Fråga efter ursprungsinformation

Händelser som innehåller information om ursprung har händelsetypen flow_definition. Objektet details:flow_definition innehåller output_dataset och input_datasets som definierar varje relation i diagrammet.

Använd följande fråga för att extrahera datauppsättningarna för indata och utdata för att se ursprungsinformation. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  details:flow_definition.output_dataset as flow_name,
  details:flow_definition.input_datasets as input_flow_names,
  details:flow_definition.flow_type as flow_type,
  details:flow_definition.schema, -- the schema of the flow
  details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;

Övervaka molnfilinmatning med Auto Loader-funktionen

Pipelines genererar händelser när autoinläsaren bearbetar filer. För händelser med automatisk inläsning är event_typeoperation_progress och details:operation_progress:type är antingen AUTO_LOADER_LISTING eller AUTO_LOADER_BACKFILL. details:operation_progress-objektet innehåller även fälten status, duration_ms, auto_loader_details:source_pathoch auto_loader_details:num_files_listed.

I det följande exemplet frågas efter Auto Loader-händelser för den senaste uppdateringen. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest_update.id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');

Övervaka kvarvarande data för att optimera strömningstiden

Varje pipeline spårar hur mycket data som finns i kvarvarande uppgifter i details:flow_progress.metrics.backlog_bytes objektet. Händelser som innehåller mått för kvarvarande uppgifter har händelsetypen flow_progress. I följande exempel efterfrågas kvarvarande mått för den senaste pipelineuppdateringen. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id;

Anmärkning

Måtten för kvarvarande uppgifter kanske inte är tillgängliga beroende på pipelinens datakällatyp och Databricks Runtime-version.

Övervaka autoskalningshändelser för att optimera klassisk beräkning

För pipelines som använder klassisk beräkning (det vill säga, inte serverlös beräkning) registrerar händelseloggen klusterstorleksförändringar när utökad autoskalning är aktiverad i dina pipelines. Händelser som innehåller information om förbättrad autoskalning har händelsetypen autoscale. Informationen om en klusterstorleksändringsbegäran lagras i objektet details:autoscale.

I följande exempel frågas om förfrågningar om storleksändringar för det förbättrade autoskalningsklustret vid den senaste pipelineuppdateringen. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Övervaka beräkningsresursanvändning för klassisk beräkning

cluster_resources händelser ger mått på antalet aktivitetsfack i klustret, hur mycket dessa aktivitetsfack används och hur många aktiviteter som väntar på att schemaläggas.

När förbättrad autoskalning är aktiverad innehåller cluster_resources händelser även mått för algoritmen för automatisk skalning, inklusive latest_requested_num_executorsoch optimal_num_executors. Händelserna visar också status för algoritmen som olika tillstånd, till exempel CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSoch BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Dessa uppgifter kan visas tillsammans med autoskalningshändelserna för att ge en helhetsbild av förbättrad autoskalning.

I följande exempel efterfrågas historiken för aktivitetsköstorlek, användningshistorik, historik för antal exekutorer och andra mått och tillstånd för automatisk skalning i den senaste pipelineuppdateringen. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
  Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
  Double(details:cluster_resources.num_executors) as current_executors,
  Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id;

Övervaka metriker för pipeline-strömning

Du kan visa mått om strömmens förlopp i en pipeline. Gör en förfrågan om stream_progress händelser för att få händelser som är mycket lik måtten i StreamingQueryListener som skapats av Structured Streaming, med följande undantag:

  • Följande mått finns i StreamingQueryListener, men inte i stream_progress: numInputRows, inputRowsPerSecondoch processedRowsPerSecond.
  • För Kafka- och Kineses-strömmar kan fälten startOffset, endOffsetoch latestOffset vara för stora och trunkerade. För varje av dessa fält läggs ett ytterligare fält, ...Truncated, startOffsetTruncated, endOffsetTruncated och latestOffsetTruncated, till med ett booleskt värde för om data är trunkerad.

Om du vill fråga efter stream_progress händelser kan du använda en fråga, till exempel följande:

SELECT
  parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';

Här är ett exempel på en händelse i JSON:

{
  "id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
  "sequence": {
    "control_plane_seq_no": 1234567890123456
  },
  "origin": {
    "cloud": "<cloud>",
    "region": "<region>",
    "org_id": 0123456789012345,
    "pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
    "pipeline_type": "WORKSPACE",
    "pipeline_name": "<pipeline name>",
    "update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
    "request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
  },
  "timestamp": "2025-06-17T03:18:14.018Z",
  "message": "Completed a streaming update of 'flow_name'."
  "level": "INFO",
  "details": {
    "stream_progress": {
      "progress": {
        "id": "abcdef12-abcd-3456-7890-abcd1234ef56",
        "runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
        "name": "silverTransformFromBronze",
        "timestamp": "2022-11-01T18:21:29.500Z",
        "batchId": 4,
        "durationMs": {
          "latestOffset": 62,
          "triggerExecution": 62
        },
        "stateOperators": [],
        "sources": [
          {
            "description": "DeltaSource[dbfs:/path/to/table]",
            "startOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "endOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "latestOffset": null,
            "metrics": {
              "numBytesOutstanding": "0",
              "numFilesOutstanding": "0"
            }
          }
        ],
        "sink": {
          "description": "DeltaSink[dbfs:/path/to/sink]",
          "numOutputRows": -1
        }
      }
    }
  },
  "event_type": "stream_progress",
  "maturity_level": "EVOLVING"
}

Det här exemplet visar otrunkerade poster i en Kafka-källa, med fälten ...Truncated inställda på false:

{
  "description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
  "startOffsetTruncated": false,
  "startOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706380
    }
  },
  "endOffsetTruncated": false,
  "endOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "latestOffsetTruncated": false,
  "latestOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "numInputRows": 292,
  "inputRowsPerSecond": 13.65826278123392,
  "processedRowsPerSecond": 14.479817514628582,
  "metrics": {
    "avgOffsetsBehindLatest": "0.0",
    "estimatedTotalBytesBehindLatest": "0.0",
    "maxOffsetsBehindLatest": "0",
    "minOffsetsBehindLatest": "0"
  }
}

Auditera pipelines

Du kan använda händelseloggposter och andra Azure Databricks-granskningsloggar för att få en fullständig bild av hur data uppdateras i en pipeline.

Lakeflow Spark Deklarativa Pipelinjer använder autentiseringsuppgifterna för pipelineägaren för att köra uppdateringar. Du kan ändra de autentiseringsuppgifter som används genom att uppdatera pipelineägaren. Granskningsloggen registrerar vilka åtgärder som utförts av användaren på pipelinen, inklusive skapande av en pipeline, ändringar i konfigurationen och uppdateringar som initieras.

Mer information om granskningshändelser i Unity Catalog finns i Unity Catalog-händelser.

Fråga efter användaråtgärder i händelseloggen

Du kan använda händelseloggen för att granska händelser, till exempel användaråtgärder. Händelser som innehåller information om användaråtgärder har händelsetypen user_action.

Information om åtgärden lagras i user_action-objektet i fältet details. Använd följande fråga för att skapa en granskningslogg med användarhändelser. Detta förutsätter att du har skapat event_log_raw vyn för den pipeline som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

Körinformation

Du kan visa körningsinformation för en pipelineuppdatering, till exempel Databricks Runtime-versionen för uppdateringen. Detta förutsätter att du har skapat event_log_raw vyn för pipelinen som du är intresserad av, enligt beskrivningen i Fråga händelseloggen.

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11.0