Monitor ingestion gateway progress with event logs

Applies to: Red X icon SaaS connectors Green check icon Database connectors

Learn how to use event logs to monitor the progress of ingestion gateways in real time. Event logs provide per-table metrics for both snapshot and change data capture (CDC) phases, enabling you to track pipeline health, identify stalled pipelines, and build automated monitoring solutions.

Progress events allow you to:

  • Track how many rows and bytes have been ingested per table without waiting for pipeline completion.
  • Monitor snapshot progress for each table to estimate completion of large initial loads.
  • Monitor each ingested table individually to identify bottlenecks or issues.
  • Receive events even when no data changes occur to confirm that the pipeline is actively running.
  • Build alerts and dashboards using structured event data instead of parsing logs.

How progress events work

The gateway emits the following event types at regular intervals (default: 5 minutes) for each table in your pipeline:

  • flow_progress events report row and byte counters for snapshot and CDC flows. The metrics in these events are deltas. They reset to zero after each emission.
  • operation_progress events report snapshot progress as a percentage. Snapshot flows emit these events in addition to flow_progress. The progress percentage is cumulative. It accumulates from 0 to 100 over the lifetime of the snapshot.

Each event includes:

  • Source and destination table names.
  • Per-table metrics: rows upserted, rows deleted (CDC only), output bytes, and progress percentage (for snapshot).
  • When the event was generated.

Events are available in the event log table but not through public APIs. You can query the event log table using SQL to analyze pipeline behavior and build monitoring solutions.

Access progress events

Progress events are stored in the event log table. To access them:

  1. Navigate to your gateway in the Azure Databricks workspace.
  2. Click the Event log tab to view events in the UI.
  3. Query the event log table directly using SQL for detailed analysis.

Query the event log table

To query flow_progress events for row and byte counters:

SELECT
  timestamp,
  CONCAT(origin.catalog_name, '.', origin.schema_name, '.', origin.dataset_name) AS table_name,
  details:flow_progress:metrics:num_upserted_rows::bigint AS rows_upserted,
  COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS rows_deleted,
  details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes,
  CASE
    WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'snapshot'
    WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'cdc'
    ELSE 'unknown'
  END AS ingestion_phase
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
  AND level = 'METRICS'
  AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC

To query operation_progress events for snapshot progress percentage:

SELECT
  timestamp,
  origin.flow_name AS flow_name,
  details:operation_progress:status::string AS status,
  details:operation_progress:progress_percent::double AS progress_pct
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
  AND level = 'METRICS'
  AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC

Replace <pipeline-id> with your gateway ID.

Understand the event structure

Progress events use one of the following event types with the METRICS log level:

  • flow_progress: Emitted for both snapshot and CDC flows. Reports per-table row and byte deltas.
  • operation_progress: Emitted only for snapshot flows. Reports the snapshot completion percentage for a table.

The following examples show the JSON structure for each event type:

Snapshot flow progress event structure
{
  "id": "01234567-89ab-cdef-0123-456789abcdef",
  "timestamp": "2025-10-14T13:33:14.175Z",
  "level": "METRICS",
  "event_type": "flow_progress",
  "origin": {
    "pipeline_type": "INGESTION_GATEWAY",
    "pipeline_name": "MyPipeline",
    "dataset_name": "customers",
    "catalog_name": "main",
    "schema_name": "sales",
    "flow_name": "main.sales.customers_snapshot_flow",
    "ingestion_source_type": "SQLSERVER"
  },
  "message": "Completed a streaming update of 'main.sales.customers_snapshot_flow'.",
  "details": {
    "flow_progress": {
      "status": "RUNNING",
      "metrics": {
        "num_upserted_rows": 7512704,
        "num_deleted_rows": null,
        "num_output_bytes": 458752000
      }
    }
  },
  "maturity_level": "STABLE"
}
CDC flow progress event structure
{
  "id": "01234567-89ab-cdef-0123-456789abcdef",
  "timestamp": "2025-10-14T13:33:57.426Z",
  "level": "METRICS",
  "event_type": "flow_progress",
  "origin": {
    "pipeline_type": "INGESTION_GATEWAY",
    "pipeline_name": "MyPipeline",
    "dataset_name": "customers",
    "catalog_name": "main",
    "schema_name": "sales",
    "flow_name": "main.sales.customers_cdc_flow",
    "ingestion_source_type": "SQLSERVER"
  },
  "message": "Completed a streaming update of 'main.sales.customers_cdc_flow'.",
  "details": {
    "flow_progress": {
      "status": "RUNNING",
      "metrics": {
        "num_upserted_rows": 25,
        "num_deleted_rows": 3,
        "num_output_bytes": 18432
      }
    }
  },
  "maturity_level": "STABLE"
}
Snapshot operation progress event structure
{
  "id": "01234567-89ab-cdef-0123-456789abcdef",
  "timestamp": "2025-10-14T13:33:14.175Z",
  "level": "METRICS",
  "event_type": "operation_progress",
  "origin": {
    "pipeline_type": "INGESTION_GATEWAY",
    "pipeline_name": "MyPipeline",
    "dataset_name": "customers",
    "catalog_name": "main",
    "schema_name": "sales",
    "flow_name": "main.sales.customers_snapshot_flow",
    "ingestion_source_type": "SQLSERVER"
  },
  "message": "Snapshot in progress for 'main.sales.customers'.",
  "details": {
    "operation_progress": {
      "type": "CDC_SNAPSHOT",
      "status": "IN_PROGRESS",
      "duration_ms": 3600000,
      "progress_percent": 65.5,
      "cdc_snapshot": {
        "target_table_name": "main.sales.customers",
        "snapshot_timestamp": 1737542400000,
        "snapshot_reason": "NEW_TABLE"
      }
    }
  },
  "maturity_level": "STABLE"
}

Event fields

The following table describes the key fields in progress events:

Field Type Description
event_type String Either flow_progress (row and byte counters for snapshot and CDC) or operation_progress (snapshot progress percentage).
level String Always METRICS.
timestamp String ISO 8601 timestamp when the event was generated.
origin.pipeline_type String Always INGESTION_GATEWAY.
origin.pipeline_name String Name of the gateway.
origin.dataset_name String Name of the table being ingested.
origin.catalog_name String Unity Catalog catalog name.
origin.schema_name String Unity Catalog schema name.
origin.flow_name String Flow identifier that indicates the ingestion phase. Format: {catalog}.{schema}.{table}_snapshot_flow for initial load or {catalog}.{schema}.{table}_cdc_flow for incremental changes.
origin.ingestion_source_type String Source database type (for example, SQLSERVER, MYSQL, POSTGRESQL, ORACLE).
details:flow_progress.status String Current flow status, typically RUNNING.
details:flow_progress.metrics.num_upserted_rows Integer Number of rows inserted or updated since the last event. Delta metric. Resets to zero after each emission.
details:flow_progress.metrics.num_deleted_rows Integer Number of rows deleted since the last event. Delta metric. Resets to zero after each emission. null for snapshot flows (snapshot does not delete).
details:flow_progress.metrics.num_output_bytes Integer Number of compressed bytes uploaded to a volume since the last event. Delta metric. Resets to zero after each emission. Populated for both snapshot and CDC flows.
details:operation_progress.type String Operation type. CDC_SNAPSHOT for snapshot operations.
details:operation_progress.status String Current operation status. IN_PROGRESS while the snapshot is running, COMPLETED when it finishes. Other values include STARTED, CANCELED, and FAILED.
details:operation_progress.duration_ms Integer Total elapsed time of the operation in milliseconds.
details:operation_progress.progress_percent Double Snapshot completion percentage (0.0 - 100.0). Cumulative, not a delta. The value increases as chunks complete and reaches 100.0 when the snapshot finishes.
details:operation_progress.cdc_snapshot.target_table_name String Fully qualified name of the table being snapshotted.
maturity_level String Always STABLE.

Metric behavior

Progress metrics fall into the following categories:

Delta metrics (num_upserted_rows, num_deleted_rows, num_output_bytes):

  • Represent changes since the last event, not cumulative totals.
  • Reset to zero after each event emission.
  • Are emitted even when no data changes occur, serving as liveness indicators.
  • For snapshot flows, num_deleted_rows is null because snapshot does not produce deletes.

Cumulative metrics (progress_percent):

  • The value accumulates from 0.0 to 100.0 over the lifetime of a snapshot.
  • Updates as snapshot chunks complete. Small tables that are not split into multiple chunks jump from 0.0 directly to 100.0 between emissions. Large tables that are split into many chunks update gradually as each chunk lands, providing an approximate progress signal.
  • The metric is intended as an approximation for large/chunked snapshots, not an exact row-level count.

Configure progress events

Progress events are enabled by default for new gateways. You can customize event behavior using pipeline configuration parameters.

Enable or disable progress events

"configuration": {
    "pipelines.gateway.progressEventsEnabled": "true"
}

Set to "false" to disable progress events.

Adjust event emission frequency

"configuration": {
    "pipelines.gateway.progressEventEmitFrequencySeconds": "300"
}

Default: 300 seconds (five minutes). Valid range: 30 to 3600 seconds (30 seconds to one hour). This setting controls the cadence of both flow_progress and operation_progress events.

Example gateway configuration

The following example shows a complete gateway configuration with progress events enabled and set to emit every five minutes:

gateway_pipeline_spec = {
   "pipeline_type": "INGESTION_GATEWAY",
   "name": "my_gateway_pipeline",
   "catalog": "main",
   "target": "my_schema",
   "continuous": True,
   "configuration": {
      "pipelines.gateway.progressEventsEnabled": "true",
      "pipelines.gateway.progressEventEmitFrequencySeconds": "300"
   },
   # ... rest of pipeline spec
}

Important behavior and limitations

Default behavior

  • The feature is enabled by default for all new gateways.
  • Existing pipelines automatically receive this feature on their next update or restart.
  • No action is required to enable progress events.

Timing considerations

  • The first emission might take up to the configured frequency interval (default: five minutes) after pipeline start before progress events appear.
  • Events are emitted at the configured frequency during active ingestion.

Zero-update metrics

  • Events are emitted for all tables, including those with zero updates.
  • Zero-update metrics help distinguish between:
    • Idle tables: Processed but no data changes occurred.
    • Unprocessed tables: Not yet picked up by the pipeline.
  • Zero-update events serve as liveness signals confirming the pipeline is actively running.

Snapshot progress percentage behavior

  • Snapshot progress is calculated as (completed_chunks / total_chunks) × 100. The metric is approximate, not an exact row-level percentage.
  • Tables that are not split into multiple chunks (typically smaller tables) jump from 0.0 directly to 100.0 between emissions because there is only one chunk to track.
  • Large tables that are split into many chunks update incrementally as each chunk completes, providing a gradual progress signal that is useful for monitoring long-running initial loads.
  • A COMPLETED status always reports progress_percent = 100.0.
  • The metric does not survive a pipeline refresh or restart. After a restart, snapshot progress resumes from the last committed checkpoint and the metric continues to climb from the resumed position.

Sample queries

The following sample queries show how to monitor your gateway using the row, byte, and snapshot progress metrics. Replace <pipeline-id> with your gateway ID in each query.

Row count queries

Volume per table (last 24 hours)

Total upserts and deletes for each table over the last 24 hours, with the ingestion phase classified from the flow name suffix. Use this as a dashboard headline to see which tables moved the most data.

WITH row_events AS (
  SELECT
    origin.flow_name AS flow_name,
    CASE
      WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
      WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
      ELSE 'OTHER'
    END AS phase,
    details:flow_progress:metrics:num_upserted_rows::bigint AS upserts,
    details:flow_progress:metrics:num_deleted_rows::bigint AS deletes
  FROM event_log('<pipeline-id>')
  WHERE event_type = 'flow_progress'
    AND level = 'METRICS'
    AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
  flow_name,
  phase,
  SUM(upserts) AS rows_upserted_24h,
  SUM(COALESCE(deletes, 0)) AS rows_deleted_24h,
  SUM(upserts) + SUM(COALESCE(deletes, 0)) AS total_rows_moved_24h
FROM row_events
GROUP BY flow_name, phase
ORDER BY total_rows_moved_24h DESC

Recent progress events (last one hour)

Recent row counters for all tables in your pipeline. Useful for near-real-time monitoring.

SELECT
  origin.pipeline_name,
  origin.dataset_name,
  origin.flow_name,
  details:flow_progress:metrics:num_upserted_rows::bigint AS num_upserted_rows,
  COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS num_deleted_rows,
  timestamp
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
  AND level = 'METRICS'
  AND origin.pipeline_type = 'INGESTION_GATEWAY'
  AND timestamp >= current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC

Identify silent or stuck tables

Tables emitting events but reporting zero upserts and zero deletes for the last 60 minutes are candidates for the "stuck" status. Investigate further if the source should be changing. CDC tables that are genuinely idle (for example, overnight) also appear here.

WITH recent_window AS (
  SELECT
    origin.flow_name AS flow_name,
    CASE
      WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
      WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
      ELSE 'OTHER'
    END AS phase,
    COUNT(*) AS emissions_in_window,
    SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS upserts_in_window,
    SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS deletes_in_window,
    MAX(timestamp) AS last_event
  FROM event_log('<pipeline-id>')
  WHERE event_type = 'flow_progress'
    AND level = 'METRICS'
    AND timestamp >= current_timestamp() - INTERVAL 60 MINUTES
  GROUP BY origin.flow_name
)
SELECT
  flow_name,
  phase,
  emissions_in_window,
  upserts_in_window,
  deletes_in_window,
  last_event,
  ROUND(TIMESTAMPDIFF(MINUTE, last_event, current_timestamp()), 0) AS minutes_since_last_event
FROM recent_window
WHERE upserts_in_window = 0
  AND deletes_in_window = 0
ORDER BY minutes_since_last_event DESC

Per-table timeline with cumulative totals

Full event-by-event history for a single flow over the last 24 hours, with cumulative row counts. Replace <flow-pattern> with a SQL LIKE pattern (for example, '%customers%_cdc_flow').

SELECT
  origin.flow_name AS flow_name,
  origin.update_id AS update_id,
  timestamp,
  details:flow_progress:metrics:num_upserted_rows::bigint AS upserts_this_period,
  COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS deletes_this_period,
  SUM(details:flow_progress:metrics:num_upserted_rows::bigint)
    OVER (PARTITION BY origin.flow_name, origin.update_id
          ORDER BY timestamp
          ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_upserts_this_run,
  SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0))
    OVER (PARTITION BY origin.flow_name, origin.update_id
          ORDER BY timestamp
          ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_deletes_this_run
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
  AND level = 'METRICS'
  AND origin.flow_name LIKE '<flow-pattern>'
  AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
ORDER BY timestamp

Output bytes queries

Bytes per table (last 24 hours)

Total bytes uploaded to a volume per table over the last 24 hours, with human-friendly MB and GB units. Sort descending to see the heaviest-volume tables.

WITH byte_events AS (
  SELECT
    origin.flow_name AS flow_name,
    CASE
      WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
      WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
      ELSE 'OTHER'
    END AS phase,
    details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes
  FROM event_log('<pipeline-id>')
  WHERE event_type = 'flow_progress'
    AND level = 'METRICS'
    AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
  flow_name,
  phase,
  SUM(output_bytes) AS bytes_24h,
  ROUND(SUM(output_bytes) / 1024.0 / 1024.0, 2) AS mb_24h,
  ROUND(SUM(output_bytes) / 1024.0 / 1024.0 / 1024.0, 3) AS gb_24h
FROM byte_events
GROUP BY flow_name, phase
ORDER BY bytes_24h DESC

Throughput trend (MB per minute)

Per-minute time series of bytes uploaded across all flows over the last 24 hours. Render as a line chart to spot throughput patterns and stalls.

SELECT
  DATE_TRUNC('MINUTE', timestamp) AS ts_minute,
  origin.flow_name AS flow_name,
  CASE
    WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
    WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
    ELSE 'OTHER'
  END AS phase,
  ROUND(SUM(details:flow_progress:metrics:num_output_bytes::bigint) / 1024.0 / 1024.0, 2) AS mb_per_minute
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
  AND level = 'METRICS'
  AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY DATE_TRUNC('MINUTE', timestamp), origin.flow_name
ORDER BY origin.flow_name, ts_minute

Average bytes per row (wide-table or LOB detector)

Joins num_output_bytes with row counts to compute average bytes per row per table. High values usually indicate LOB or wide-schema tables that drive throughput cost. Useful for capacity planning and schema review.

WITH joined AS (
  SELECT
    origin.flow_name AS flow_name,
    CASE
      WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
      WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
      ELSE 'OTHER'
    END AS phase,
    SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS total_bytes,
    SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS total_upserts,
    SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS total_deletes
  FROM event_log('<pipeline-id>')
  WHERE event_type = 'flow_progress'
    AND level = 'METRICS'
    AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
  GROUP BY origin.flow_name
)
SELECT
  flow_name,
  phase,
  total_upserts + total_deletes AS total_rows,
  ROUND(total_bytes / 1024.0 / 1024.0, 2) AS total_mb,
  ROUND(total_bytes / NULLIF(total_upserts + total_deletes, 0), 0) AS avg_bytes_per_row
FROM joined
WHERE total_bytes > 0
ORDER BY avg_bytes_per_row DESC

Snapshot progress queries

Overall snapshot progress

A single-row summary of how many tables are completed, in progress, or queued in the current snapshot run. Pin this as a top-level dashboard widget for a quick health check.

WITH latest_update AS (
  SELECT origin.update_id AS update_id
  FROM event_log('<pipeline-id>')
  WHERE event_type = 'create_update'
  ORDER BY timestamp DESC LIMIT 1
),
latest_per_table AS (
  SELECT
    origin.flow_name AS flow_name,
    details:operation_progress:status::string AS status,
    details:operation_progress:progress_percent::double AS progress_pct,
    ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
  FROM event_log('<pipeline-id>')
  WHERE event_type = 'operation_progress'
    AND level = 'METRICS'
    AND origin.update_id = (SELECT update_id FROM latest_update)
)
SELECT
  COUNT(*) AS total_tables,
  SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) AS tables_completed,
  SUM(CASE WHEN status = 'IN_PROGRESS' AND progress_pct > 0 AND progress_pct < 100 THEN 1 ELSE 0 END) AS tables_in_progress,
  SUM(CASE WHEN progress_pct = 0 THEN 1 ELSE 0 END) AS tables_not_started,
  ROUND(AVG(progress_pct), 2) AS overall_progress_pct,
  ROUND(SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS pct_tables_done
FROM latest_per_table
WHERE rn = 1

Per-table snapshot status board

Every table in the current snapshot runs with its latest status and progress percentage. Sort by progress_pct to see which tables need the most work.

WITH latest_update AS (
  SELECT origin.update_id AS update_id
  FROM event_log('<pipeline-id>')
  WHERE event_type = 'create_update'
  ORDER BY timestamp DESC LIMIT 1
),
table_status AS (
  SELECT
    origin.flow_name AS flow_name,
    details:operation_progress:status::string AS status,
    details:operation_progress:progress_percent::double AS progress_pct,
    timestamp,
    ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
  FROM event_log('<pipeline-id>')
  WHERE event_type = 'operation_progress'
    AND level = 'METRICS'
    AND origin.update_id = (SELECT update_id FROM latest_update)
)
SELECT
  flow_name,
  status,
  ROUND(progress_pct, 2) AS progress_pct,
  timestamp AS last_update,
  CASE
    WHEN status = 'COMPLETED' THEN 'Done'
    WHEN progress_pct = 0 THEN 'Queued'
    ELSE 'Active'
  END AS phase
FROM table_status
WHERE rn = 1
ORDER BY
  CASE status WHEN 'IN_PROGRESS' THEN 0 WHEN 'COMPLETED' THEN 1 ELSE 2 END,
  progress_pct ASC

Snapshot rows and bytes loaded in the current update

Combines progress percentage, rows upserted, and bytes uploaded for the current snapshot run. Use this to render "X% complete, N rows, N MB" tiles on a dashboard.

WITH latest_update AS (
  SELECT origin.update_id AS update_id
  FROM event_log('<pipeline-id>')
  WHERE event_type = 'create_update'
  ORDER BY timestamp DESC LIMIT 1
),
progress AS (
  SELECT
    origin.flow_name AS flow_name,
    details:operation_progress:progress_percent::double AS progress_pct,
    details:operation_progress:status::string AS status,
    ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
  FROM event_log('<pipeline-id>')
  WHERE event_type = 'operation_progress'
    AND level = 'METRICS'
    AND origin.update_id = (SELECT update_id FROM latest_update)
),
volume AS (
  SELECT
    origin.flow_name AS flow_name,
    SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS rows_loaded,
    SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS bytes_loaded
  FROM event_log('<pipeline-id>')
  WHERE event_type = 'flow_progress'
    AND level = 'METRICS'
    AND origin.flow_name LIKE '%_snapshot_flow'
    AND origin.update_id = (SELECT update_id FROM latest_update)
  GROUP BY origin.flow_name
)
SELECT
  p.flow_name,
  p.status,
  ROUND(p.progress_pct, 2) AS progress_pct,
  v.rows_loaded,
  ROUND(v.bytes_loaded / 1024.0 / 1024.0, 2) AS mb_loaded,
  ROUND(v.bytes_loaded / 1024.0 / 1024.0 / 1024.0, 3) AS gb_loaded
FROM progress p
LEFT JOIN volume v ON p.flow_name = v.flow_name
WHERE p.rn = 1
ORDER BY p.progress_pct ASC

Stalled snapshot detection

Snapshot tables whose progress_percent has not changed in the last 30 minutes. Schedule this as an alert query to catch snapshots that have stalled but are still emitting events.

WITH latest_update AS (
  SELECT origin.update_id AS update_id
  FROM event_log('<pipeline-id>')
  WHERE event_type = 'create_update'
  ORDER BY timestamp DESC LIMIT 1
),
recent AS (
  SELECT
    origin.flow_name AS flow_name,
    details:operation_progress:progress_percent::double AS progress_pct,
    details:operation_progress:status::string AS status,
    timestamp
  FROM event_log('<pipeline-id>')
  WHERE event_type = 'operation_progress'
    AND level = 'METRICS'
    AND origin.update_id = (SELECT update_id FROM latest_update)
    AND timestamp >= current_timestamp() - INTERVAL 30 MINUTES
)
SELECT
  flow_name,
  ROUND(MIN(progress_pct), 2) AS min_pct_30min,
  ROUND(MAX(progress_pct), 2) AS max_pct_30min,
  ROUND(MAX(progress_pct) - MIN(progress_pct), 2) AS pct_change_30min,
  COUNT(*) AS events_in_window,
  MAX(timestamp) AS last_event_ts
FROM recent
WHERE status = 'IN_PROGRESS'
GROUP BY flow_name
HAVING MAX(progress_pct) - MIN(progress_pct) = 0
   AND MAX(progress_pct) < 100
ORDER BY max_pct_30min ASC

Troubleshooting

No progress events appear

If you don't see progress events in the event log:

  1. Check that pipelines.gateway.progressEventsEnabled is set to "true".
  2. Wait for at least one full interval after pipeline start. Default is five minutes.
  3. Check that the pipeline is actively running and ingesting.
  4. Include level = 'METRICS' filter to see only progress events.

Events appear too frequently or infrequently

If events don't appear at the expected frequency:

Check the pipelines.gateway.progressEventEmitFrequencySeconds setting and adjust as needed:

  • Default is five minutes (300 seconds).
  • Valid range: 30 to 3600 seconds. Adjust as needed.

Metrics show zero after pipeline restart

If metrics reset to zero after a pipeline restart:

Metrics are in-memory only and reset on restart, refresh, or resume. This is intentional for implementation simplicity. The pipeline will start accumulating fresh metrics immediately.

Missing metrics for some tables

If some tables don't show progress events:

  1. Make sure that the table is not filtered out in the pipeline configuration.
  2. For CDC phase, make sure that the source table has CDC or change tracking enabled.
  3. Confirm that the table is included in the gateway configuration.
  4. Note that progress_percent is only emitted in operation_progress events for snapshot flows. CDC flows do not emit operation_progress events because CDC has no concept of completion.

Additional resources