Read and write Parquet files

Apache Parquet is a columnar file format optimized for analytical workloads. It allows query engines to read only the columns needed and skip irrelevant row groups. Parquet is the underlying storage format for Delta Lake(/delta/index.md), making it the most common format for data stored in Azure Databricks. Azure Databricks supports Parquet for both reading and writing with Apache Spark, including schema specification, partitioning, and write compression.

Prerequisites

Azure Databricks does not require additional configuration to use Parquet files. However, to stream Parquet files, you need Auto Loader.

Options

Use the .option() and .options() methods of DataFrameReader and DataFrameWriter to configure Parquet data sources. For a complete list of supported options, see DataFrameReader Parquet options and DataFrameWriter Parquet options.

Usage

The following examples use the Wanderbricks sample dataset to demonstrate reading and writing Parquet files using the Spark DataFrame API and SQL.

Read Parquet files using SQL

Use read_files to query Parquet files directly from cloud storage using SQL without creating a table.

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_parquet',
  format => 'parquet'
)

Read and write Parquet files

The following examples write the Wanderbricks reviews to Parquet format, read them back into a DataFrame, and demonstrate overwrite mode.

Python

# Write wanderbricks reviews to Parquet format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("parquet").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")

# Read a Parquet file into a DataFrame
df = spark.read.format("parquet").load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
display(df)

# Write with overwrite mode
df.write.format("parquet").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")

Scala

// Write wanderbricks reviews to Parquet format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("parquet").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")

// Read a Parquet file into a DataFrame
val df = spark.read.format("parquet").load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
df.show()

// Write with overwrite mode
df.write.format("parquet").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")

SQL

-- Write wanderbricks reviews to Parquet format
CREATE TABLE reviews_parquet
USING PARQUET
AS SELECT * FROM samples.wanderbricks.reviews;

SELECT * FROM reviews_parquet;

Specify a schema

Specify a schema when reading Parquet files to avoid the overhead of schema inference. For example, define a schema with review_id, rating, and comment fields and read reviews_parquet into a DataFrame.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("review_id", StringType(), True),
    StructField("rating", IntegerType(), True),
    StructField("comment", StringType(), True)
])

df = spark.read.format("parquet").schema(schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
df.printSchema()
df.show()

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val schema = StructType(Array(
  StructField("review_id", StringType, nullable = true),
  StructField("rating", IntegerType, nullable = true),
  StructField("comment", StringType, nullable = true)
))

val df = spark.read.format("parquet").schema(schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
df.printSchema()
df.show()

SQL

-- Create a table with an explicit schema from Parquet files
CREATE TABLE reviews_parquet (
  review_id STRING,
  rating INT,
  comment STRING
)
USING PARQUET
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews_parquet");

SELECT * FROM reviews_parquet;

Write partitioned Parquet files

Write partitioned Parquet files for optimized query performance on large datasets. For example, read samples.wanderbricks.bookings and write it to bookings_parquet_partitioned partitioned by year and month derived from the check_in column.

Python

from pyspark.sql.functions import year, month

df = spark.read.table("samples.wanderbricks.bookings")
df_with_parts = df.withColumn("year", year("check_in")).withColumn("month", month("check_in"))
df_with_parts.write.format("parquet").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_parquet_partitioned")

Scala

import org.apache.spark.sql.functions.{year, month}

val bookings = spark.read.table("samples.wanderbricks.bookings")
val bookingsWithParts = bookings.withColumn("year", year(col("check_in"))).withColumn("month", month(col("check_in")))
bookingsWithParts.write.format("parquet").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_parquet_partitioned")

SQL

-- Write partitioned Parquet files by year and month
CREATE TABLE bookings_parquet_partitioned
USING PARQUET
PARTITIONED BY (year, month)
AS SELECT *, year(check_in) AS year, month(check_in) AS month
FROM samples.wanderbricks.bookings;

Additional resources

  • What is Delta Lake in Azure Databricks?: If you need ACID transactions, schema enforcement, or time travel alongside Parquet's columnar performance, Delta Lake is the recommended format for data stored in Azure Databricks.