Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Important
The PostgreSQL connector for Lakeflow Connect is in Public Preview. Reach out to your Databricks account team to enroll in the Public Preview.
This page describes the ongoing operations for maintaining PostgreSQL ingestion pipelines.
General pipeline maintenance
The pipeline maintenance tasks in this section apply to all managed connectors in Lakeflow Connect.
For general pipeline maintenance tasks, see Common pipeline maintenance tasks.
Remove unused staging files
For ingestion pipelines that you create after January 6, 2025, volume staging data is automatically scheduled for deletion after 25 days and physically removed after 30 days. An ingestion pipeline that has not completed successfully for 25 days or longer might result in data gaps in the destination tables. To avoid gaps, trigger a full refresh of the target tables.
For ingestion pipelines created before January 6, 2025, contact Databricks Support to request manual enablement of automatic retention management for staging CDC data.
The following data is automatically cleaned up:
- CDC data files
- Snapshot files
- Staging table data
Connector-specific pipeline maintenance
The pipeline maintenance tasks in this section are specific to the PostgreSQL connector.
Add new tables to replication
To add new tables to an existing replication flow:
Grant the necessary privileges to the replication user. For a complete list of required privileges, see PostgreSQL database user requirements.
Set the replica identity for the new tables based on their structure. See Set replica identity for tables for guidance on choosing the correct replica identity setting.
Add the tables to the publication:
ALTER PUBLICATION databricks_publication ADD TABLE schema_name.new_table;Update the ingestion pipeline configuration to include the new tables. You can do this through the Azure Databricks UI or by updating the
ingestion_definitionin your Databricks Asset Bundles bundle or CLI command.Restart the ingestion gateway to discover the new tables. The gateway periodically checks for new tables, but restarting the gateway speeds up the discovery process.
Clean up replication slots
When you delete an ingestion pipeline, ** the replication slot is not automatically removed from the source PostgreSQL database **. Unused replication slots can cause Write-Ahead Log (WAL) files to accumulate, potentially filling up disk space on the source database.
To list all replication slots:
SELECT slot_name, slot_type, active, restart_lsn, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;
To drop a replication slot that is no longer needed:
SELECT pg_drop_replication_slot('slot_name');
Clean up inline DDL tracking
If you disable inline DDL tracking, run the steps below for each database to clean up objects created by the audit script.
Drop the event triggers:
DROP EVENT TRIGGER IF EXISTS lakeflow_ddl_trigger CASCADE;Remove the audit table from the publication:
ALTER PUBLICATION databricks_publication DROP TABLE public.lakeflow_ddl_audit;Drop the audit table:
DROP TABLE IF EXISTS public.lakeflow_ddl_audit CASCADE;
Monitor replication slots
Monitor the status of replication slots to ensure they are active and consuming WAL data:
SELECT slot_name,
active,
wal_status,
active_pid,
restart_lsn,
confirmed_flush_lsn,
pg_current_wal_lsn() AS current_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS replication_lag
FROM pg_replication_slots
WHERE slot_name LIKE 'databricks%';
Large replication lag values can indicate one of the following issues:
- The ingestion gateway is not keeping up with the changes in the source database.
- The ingestion gateway has been stopped for an extended period.
- Network connectivity issues between the gateway and the source database.
If a replication slot is inactive (active = false) and you have confirmed that the corresponding pipeline is no longer needed, drop the replication slot to free up the resources. See Clean up replication slots.
Monitor WAL disk usage
Monitor Write-Ahead Log (WAL) disk usage to prevent disk space issues:
SELECT pg_size_pretty(sum(size)) AS wal_size
FROM pg_ls_waldir();
To check WAL retention for a specific replication slot:
SELECT slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS pending_wal
FROM pg_replication_slots
WHERE slot_name = 'your_slot_name';
Note
If max_slot_wal_keep_size is properly configured during source setup (as recommended in Limit WAL retention for replication slots), inactive replication slots will not cause unbounded WAL growth. The slot will be invalidated when the limit is reached, preventing database crashes.
If WAL disk usage is high, perform the following steps:
Verify that the ingestion gateway is running continuously.
Check the gateway logs for errors that might be preventing it from consuming WAL data.
Consider setting
max_slot_wal_keep_sizeto limit WAL retention (PostgreSQL 13 or above):ALTER SYSTEM SET max_slot_wal_keep_size = '10GB'; SELECT pg_reload_conf();Warning
Setting
max_slot_wal_keep_sizecan cause replication slots to be invalidated if the WAL retention limit is exceeded, requiring a full refresh of all tables.
Restart the ingestion gateway
To decrease the load on the source database, the ingestion gateway only checks periodically for new tables. It might take up to 6 hours for the gateway to discover new tables. If you want to speed up this process, restart the gateway.
Additionally, restart the gateway in the following situations:
- You have made configuration changes to the source database.
- The gateway is experiencing errors or performance issues.
Update publications
If you need to modify which tables are included in replication:
-- Add a table to the publication
ALTER PUBLICATION databricks_publication ADD TABLE schema_name.table_name;
-- Remove a table from the publication
ALTER PUBLICATION databricks_publication DROP TABLE schema_name.table_name;
-- List all tables in a publication
SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = 'databricks_publication';
After updating the publication, restart the ingestion gateway to apply the changes.