Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
This page describes best practices you can apply to configure Auto Loader to run reliably, cost-effectively, and at scale for your use case.
These best practices reduce operational overhead and prevent common issues that are difficult to diagnose in production such as: unnecessary LIST API costs from full directory scans, silent data loss from schema drift, and pipeline restarts caused by checkpoint misconfiguration.
For production configuration details, see Configure Auto Loader for production workloads. For monitoring and observability, see Monitor and observe Auto Loader.
Choose the right execution framework
The best execution framework for your use case depends on how much control you need over the pipeline and how much operational overhead you want to manage. For most users and production pipelines, Auto Loader with Lakeflow Spark Declarative Pipelines is a good fit. However, if you need maximum control and customization, use Auto Loader with Structured Streaming. For the simplest setup with a managed experience, use a managed LakeFlow Connector when available.
Lakeflow Spark Declarative Pipelines extends Structured Streaming with autoscaling, data quality checks, schema evolution handling, and monitoring through the event log. Databricks recommends Lakeflow Spark Declarative Pipelines for most production ingestion workloads.
Choose the right scheduling and trigger type
The best scheduling and trigger type for your use case depends on your latency requirements and file arrival patterns. For most use cases, Databricks recommends a file arrival trigger with file events enabled. This achieves low-latency ingestion at low cost because compute only runs when new files arrive. The three trigger types differ in when and how often the pipeline starts:
- Continuous: The pipeline runs without stopping. Use only when sub-second latency is a hard requirement, since continuous compute costs more. Pair with file events.
- File arrival trigger: The pipeline starts when new files land in the source location. Best for low-to-medium latency or irregular file arrival patterns. Requires file events to be enabled. See Trigger jobs when new files arrive.
- Scheduled: The pipeline runs on a time-based schedule (for example, every hour). Use when latency requirements are lenient (minutes to hours). Works with directory listing, but file events reduce costs even in scheduled mode by avoiding full directory scans.
For details on using Trigger.AvailableNow for batch scheduling, see Using Trigger.AvailableNow and rate limiting.
Choose the right file discovery mode
Auto Loader supports three file discovery modes with different tradeoffs in setup complexity, scalability, and cost.
| Mode | Setup complexity | Scalability | Cost | When to use |
|---|---|---|---|---|
| File events (recommended) | Low (one-time permission setup) | Millions of files per hour | Lowest | Default for most workloads |
| Classic file notification | High (21+ cloud configuration options) | Millions of files per hour | Medium | When file events are unavailable |
| Directory listing | None | Limited by directory size | Highest (LIST API costs) | Small directories, one-time backfills, or when security policies prevent file events |
File events consolidate cloud storage resources by using one subscription and queue per external location instead of one per stream. The performance difference is significant at scale: directory listing must scan the entire source directory on every trigger, so ingestion time grows with directory size. File events deliver new file notifications directly, so ingestion time stays low regardless of how many objects are in the directory.
Enable file events
File events require a one-time cloud permissions grant and an external location configured to use the managed file events service. Once set up, all Auto Loader streams reading from that external location can use file events without additional configuration.
Grant the required cloud permissions on the cloud provider side. Requirements vary by cloud provider. See Set up file events for an external location.
Set
cloudFiles.useManagedFileEventstotruein your Auto Loader query.df = (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.useManagedFileEvents", "true") .load("/path/to/data/dir"))For full setup steps, see Migrate to Auto Loader with file events.
When you cannot use file events
You might not be able to use file events when:
- The external location is not configured with file events.
- Organization security policies do not allow enabling file events on a shared external location.
In these cases, use classic file notification mode or directory listing mode. For a full comparison of file detection modes, see Compare Auto Loader file detection modes.
Manage schema evolution
Auto Loader infers schema automatically, but how you configure schema evolution affects data completeness and pipeline stability. Use the following table to choose a strategy.
| Scenario | Recommendation |
|---|---|
| Schema is known and fixed | Provide an explicit schema with .schema() |
| Schema is unknown, additive changes expected | schemaEvolutionMode: addNewColumns |
| Schema is unknown, type changes expected | schemaEvolutionMode: addNewColumnsWithTypeWidening |
| Strict schema contract required | schemaEvolutionMode: failOnNewColumns |
| Arbitrary or unpredictable schema | Ingest as Variant type |
After you have chosen a strategy, apply the following practices to fine-tune how schema evolution behaves.
Use schema hints for known field types
Use the cloudFiles.schemaHints option to enforce types for fields you know in advance, while still allowing schema inference for other fields.
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "id long, amount double")
.load("/path/to/data/dir"))
Use type widening for compatible type changes
The addNewColumnsWithTypeWidening schema evolution mode automatically widens compatible types (for example, int to long) instead of routing data to the _rescued_data column. This avoids the need for post-processing jobs to handle simple type promotions.
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
.load("/path/to/data/dir"))
Ingest as Variant type for unpredictable schemas
When your data does not conform to any specific schema, or the schema changes continuously, ingest the data as a Variant type.
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "data")
.load("/path/to/data/dir"))
Variant provides schema-on-read at query time but is less efficient than querying structured columns. For the full mechanics of schema inference and evolution, see Configure schema inference and evolution in Auto Loader.
Handle bad data and data quality
The following practices help you detect, capture, and isolate bad data before it propagates to downstream layers.
Enable _rescued_data and _corrupt_record
Auto Loader provides two columns for capturing data that fails to parse cleanly.
_rescued_datacaptures fields that do not match the current schema. It is added automatically by Auto Loader._corrupt_recordcaptures rows that cannot be parsed at all. Enable it usingcolumnNameOfCorruptRecord:
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "_corrupt_record string")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.load("/path/to/data/dir"))
Databricks recommends columnNameOfCorruptRecord over badRecordsPath to avoid potential race conditions that can miss corrupted records.
Use Lakeflow Spark Declarative Pipelines expectations for monitoring
Set Lakeflow Spark Declarative Pipelines expectations to verify that _rescued_data and _corrupt_record are NULL under normal conditions. Non-NULL values signal schema drift or data corruption.
import dlt
@dlt.table
@dlt.expect("no rescued data", "_rescued_data IS NULL")
@dlt.expect("no corrupt records", "_corrupt_record IS NULL")
def bronze_table():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "_corrupt_record string")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.load("/path/to/data/dir"))
Isolate corrupt data
Isolate rows containing unparsable data in a dedicated sink for investigation. This prevents corrupt data from propagating to downstream layers.
import dlt
@dlt.table
def corrupt_records_sink():
return dlt.read_stream("bronze_table").where("_corrupt_record IS NOT NULL")
@dlt.view
def clean_table():
return dlt.read_stream("bronze_table").where("_corrupt_record IS NULL")
Annotate data with source file metadata
Include the _metadata column in your Auto Loader ingestion queries. At minimum, capture file_path and file_modification_time. This enables you to trace data issues back to specific source files and join against cloud_files_state() for the full file lifecycle.
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/path/to/data/dir")
.select("*", "_metadata.file_path", "_metadata.file_modification_time"))
For details, see File metadata column.
Optimize cost and performance
The following practices reduce the three main cost drivers for Auto Loader: cloud LIST API calls, idle compute, and long-term storage growth.
Use file events to minimize LIST API costs: File events provide incremental file discovery, eliminating the need for full directory listings on each run. This is the single most impactful cost optimization for Auto Loader.
Use file arrival triggers for event-driven processing: File arrival triggers start your pipeline only when new files arrive, so you do not pay for idle compute. See Trigger jobs when new files arrive.
Archive processed files with cloudFiles.cleanSource: Use
cloudFiles.cleanSourceto automatically delete or move processed files. This reduces both storage costs and directory listing costs for long-lived streams. For full details, see Archiving files in the source directory to lower costs.- Use
deletemode to remove files after ingestion. - Use
movemode to archive files to a different location for compliance or auditing.
df = (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.cleanSource", "delete") .load("/path/to/data/dir"))Warning
Do not enable
cloudFiles.cleanSourceif multiple Auto Loader streams or other clients read from the same source directory.- Use
Take advantage of performance improvements: Upgrade to the latest Databricks Runtime or use serverless compute to benefit from recent Auto Loader performance improvements.
Checkpoint management
The checkpoint stores the stream's progress and file state. Misconfiguring or losing the checkpoint requires a full restart, so treat it as critical infrastructure.
- Never apply cloud object lifecycle policies to checkpoint locations. If checkpoint files are deleted, the stream state is corrupted and you must restart from scratch.
- Use separate checkpoints for each stream and source directory.
- Consider
cloudFiles.maxFileAgefor long-lived, high-volume streams to limit state growth. Use a conservative setting (90 days minimum recommended). Setting this value too aggressively risks reprocessing files that Auto Loader has already ingested if they fall outside the window.
For full details, see File event tracking.
Use volumes for optimal file discovery with file events
For enhanced performance with file events, create an external volume for each path or subdirectory that Auto Loader loads from. Supply volume paths (for example, /Volumes/catalog/schema/volume) to Auto Loader instead of cloud paths (for example, s3://bucket/path). This optimizes file discovery through an optimized data access pattern.
For more file events best practices, see Best practices for Auto Loader with file events.