Share via


Tutorial: COPY INTO with Spark SQL

Databricks recommends that you use the COPY INTO command for incremental and bulk data loading for data sources that contain thousands of files.

In this tutorial, you use the COPY INTO command to load JSON data from a Unity Catalog volume into a Delta table in your Azure Databricks workspace. You use the Wanderbricks sample dataset as the data source. For more advanced ingestion use cases, see What is Auto Loader?.

Requirements

Step 1: Configure your environment

The code in this tutorial uses a Unity Catalog volume to store JSON source files. Replace <catalog> with a catalog where you have CREATE SCHEMA and CREATE VOLUME permissions. If you cannot run the code, contact your workspace administrator.

Create a notebook and attach it to a compute resource. Then run the following code to set up a schema and volume for this tutorial.

Python

# Set parameters and reset demo environment

catalog = "<catalog>"

username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"

spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")

SQL

-- Reset demo environment

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;

Step 2: Write sample data to the volume as JSON

The COPY INTO command loads data from file-based sources. Read from the Wanderbricks bookings sample table and write a batch of records as JSON files to your volume, simulating data arriving from an external system.

Python

# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")

SQL

Writing files to a volume requires Python. In a real-world workflow, this data would arrive from an external system.

%python
# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Step 3: Use COPY INTO to load JSON data idempotently

Create a target Delta table before using COPY INTO. You don't need to provide anything other than a table name in your CREATE TABLE statement. Because this action is idempotent, Databricks loads the data only once, even if you run the code multiple times.

Python

# Create target table and load data

spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")

spark.sql(f"""
  COPY INTO {catalog}.{schema}.bookings_target
  FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('mergeSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true')
""")

SQL

-- Create target table and load data

CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;

COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

Step 4: Preview the contents of your table

Verify that the table contains 20 rows from the first batch of Wanderbricks bookings data and that the schema was correctly inferred from the JSON source files.

Python

# Review loaded data

display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))

SQL

-- Review loaded data

SELECT * FROM <catalog>.copy_into_tutorial.bookings_target

Step 5: Load more data and preview results

You can simulate additional data arriving from an external system by writing another batch of records and running COPY INTO again. Run the following code to write a second batch of data.

Python

# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")

SQL

Writing files to a volume requires Python. In a real-world workflow, this data would arrive from an external system.

%python
# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

Then run the COPY INTO command from Step 3 again and preview the table to confirm the new records. Only the new files are loaded.

Python

# Confirm new data was loaded

display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))

SQL

-- Confirm new data was loaded

SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target

Step 6: Clean up tutorial

When you are done with this tutorial, you can clean up the associated resources if you no longer want to keep them. Drop the schema, tables, and volume, and remove all data.

Python

# Drop schema and all associated objects

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")

SQL

-- Drop schema and all associated objects

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;

Additional resources