หมายเหตุ
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลอง ลงชื่อเข้าใช้หรือเปลี่ยนไดเรกทอรีได้
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลองเปลี่ยนไดเรกทอรีได้
CSV (comma-separated values) is a plain-text tabular format widely used for data exchange, ETL pipelines, and general-purpose data storage. Azure Databricks supports CSV for both reading and writing with Apache Spark, including schema inference, compression, malformed record handling, and rescued data.
Note
Databricks recommends the read_files table-valued function for SQL users to read CSV files. read_files is available in Databricks Runtime 13.3 LTS and above.
You can also use a temporary view. If you use SQL to read CSV data directly without using temporary views or read_files, the following limitations apply:
- You can't specify data source options.
- You can't specify the schema for the data.
Prerequisites
Azure Databricks does not require additional configuration to use CSV files. However, to stream CSV files, you need Auto Loader.
Options
Use the .option() and .options() methods of DataFrameReader and DataFrameWriter to configure CSV data sources. For a complete list of supported options, see DataFrameReader CSV options and DataFrameWriter CSV options.
Usage
The following examples demonstrate reading and writing CSV files, specifying schemas, and handling malformed records.
Read CSV files
The following example uses the Wanderbricks sample dataset. It writes reviews data to CSV, then reads it back.
Python
# Write wanderbricks reviews to CSV format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
# Read the CSV file into a DataFrame
df = (spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv"))
display(df)
df.printSchema()
Scala
// Write wanderbricks reviews to CSV format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
// Read the CSV file into a DataFrame
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
df.printSchema()
R
df <- read.df("/Volumes/<catalog>/<schema>/<volume>/reviews_csv", source = "csv", header = "true", inferSchema = "true")
display(df)
printSchema(df)
Read CSV files using SQL
The following SQL example reads a CSV file using read_files.
-- mode "FAILFAST" aborts file parsing with a RuntimeException if malformed lines are encountered
SELECT * FROM read_files(
'abfss://<bucket>@<storage-account>.dfs.core.windows.net/<path>/<file>.csv',
format => 'csv',
header => true,
mode => 'FAILFAST')
Specify a schema
When the schema of the CSV file is known, you can specify the desired schema to the CSV reader with the schema option.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int, comment string'
)
Read a subset of columns
The behavior of the CSV parser depends on which columns are read. If the specified schema does not match the file layout, results can differ considerably depending on which columns are accessed. CSV has no column-name metadata, so Spark maps schema fields to columns by position — a mismatched schema shifts values into the wrong fields.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Read only a subset of columns by specifying a partial schema
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
display(df)
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int'
)
Handle malformed CSV records
When reading CSV files with a specified schema, it is possible that the data in the files does not match the schema. For example, a field containing name of the city will not parse as an integer. The consequences depend on the mode that the parser runs in:
PERMISSIVE(default): nulls are inserted for fields that could not be parsed correctlyDROPMALFORMED: drops lines that contain fields that could not be parsedFAILFAST: aborts the reading if any malformed data is found
To set the mode, use the mode option.
Python
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
Scala
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE'
)
In the PERMISSIVE mode it is possible to inspect the rows that could not be parsed correctly using one of the following methods:
- You can provide a custom path to the option
badRecordsPathto record corrupt records to a file. - You can add the column
_corrupt_recordto the schema provided to the DataFrameReader to review corrupt records in the resultant DataFrame.
Note
The badRecordsPath option takes precedence over _corrupt_record, meaning that malformed rows written to the provided path do not appear in the resultant DataFrame.
Default behavior for malformed records changes when using the rescued data column.
To inspect malformed rows using _corrupt_record, add it to the schema and filter on non-null values:
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True),
StructField("_corrupt_record", StringType(), True)
])
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
display(df.filter(df["_corrupt_record"].isNotNull()))
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true),
StructField("_corrupt_record", StringType, nullable = true)
))
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.filter(df("_corrupt_record").isNotNull).show()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
schema => 'review_id string, rating int, comment string, _corrupt_record string'
)
WHERE _corrupt_record IS NOT NULL
Enable the rescued data column
Note
This feature is supported in Databricks Runtime 8.3 and above.
When using the PERMISSIVE mode, you can enable the rescued data column to capture any data that wasn't parsed because one or more fields in a record have one of the following issues:
- Absent from the provided schema.
- Does not match the data type of the provided schema.
- Has a case mismatch with the field names in the provided schema.
The rescued data column is returned as a JSON document containing the columns that were rescued, and the source file path of the record.
To enable the rescued data column, set the rescuedDataColumn option to a column name when reading:
Python
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
Scala
val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
rescuedDataColumn => '_rescued_data'
)
To remove the source file path from the rescued data column, set:
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
The CSV parser supports three modes when parsing records: PERMISSIVE, DROPMALFORMED, and FAILFAST. When used together with rescuedDataColumn, data type mismatches do not cause records to be dropped in DROPMALFORMED mode or throw an error in FAILFAST mode. Only corrupt records—that is, incomplete or malformed CSV—are dropped or throw errors.
When rescuedDataColumn is used in PERMISSIVE mode, the following rules apply to corrupt records:
- The first row of the file (either a header row or a data row) sets the expected row length.
- A row with a different number of columns is considered incomplete.
- Data type mismatches are not considered corrupt records.
- Only incomplete and malformed CSV records are considered corrupt and recorded to the
_corrupt_recordcolumn orbadRecordsPath.
Additional resources
- Read and write Parquet files: If your workload requires better query performance or more efficient storage, Parquet's columnar layout offers significant advantages over CSV's plain-text format.