Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Databricks recommends that you use the COPY INTO command for incremental and bulk data loading for data sources that contain thousands of files.
In this tutorial, you use the COPY INTO command to load JSON data from a Unity Catalog volume into a Delta table in your Azure Databricks workspace. You use the Wanderbricks sample dataset as the data source. For more advanced ingestion use cases, see What is Auto Loader?.
Requirements
- Access to a compute resource. See Compute.
- A Unity Catalog-enabled workspace with permissions to create schemas and volumes in a catalog. See Connect to cloud object storage using Unity Catalog.
Step 1: Configure your environment
The code in this tutorial uses a Unity Catalog volume to store JSON source files. Replace <catalog> with a catalog where you have CREATE SCHEMA and CREATE VOLUME permissions. If you cannot run the code, contact your workspace administrator.
Create a notebook and attach it to a compute resource. Then run the following code to set up a schema and volume for this tutorial.
Python
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
SQL
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
Step 2: Write sample data to the volume as JSON
The COPY INTO command loads data from file-based sources. Read from the Wanderbricks bookings sample table and write a batch of records as JSON files to your volume, simulating data arriving from an external system.
Python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
SQL
Writing files to a volume requires Python. In a real-world workflow, this data would arrive from an external system.
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Step 3: Use COPY INTO to load JSON data idempotently
Create a target Delta table before using COPY INTO. You don't need to provide anything other than a table name in your CREATE TABLE statement. Because this action is idempotent, Databricks loads the data only once, even if you run the code multiple times.
Python
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
SQL
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
Step 4: Preview the contents of your table
Verify that the table contains 20 rows from the first batch of Wanderbricks bookings data and that the schema was correctly inferred from the JSON source files.
Python
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
SQL
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
Step 5: Load more data and preview results
You can simulate additional data arriving from an external system by writing another batch of records and running COPY INTO again. Run the following code to write a second batch of data.
Python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
SQL
Writing files to a volume requires Python. In a real-world workflow, this data would arrive from an external system.
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Then run the COPY INTO command from Step 3 again and preview the table to confirm the new records. Only the new files are loaded.
Python
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
SQL
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
Step 6: Clean up tutorial
When you are done with this tutorial, you can clean up the associated resources if you no longer want to keep them. Drop the schema, tables, and volume, and remove all data.
Python
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
SQL
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;