Get started using COPY INTO to load data
The COPY INTO
SQL command lets you load data from a file location into a Delta table. This is a re-triable and idempotent operation; files in the source location that have already been loaded are skipped.
COPY INTO
offers the following capabilities:
- Easily configurable file or directory filters from cloud storage, including S3, ADLS Gen2, ABFS, GCS, and Unity Catalog volumes.
- Support for multiple source file formats: CSV, JSON, XML, Avro, ORC, Parquet, text, and binary files
- Exactly-once (idempotent) file processing by default
- Target table schema inference, mapping, merging, and evolution
Note
For a more scalable and robust file ingestion experience, Databricks recommends that SQL users leverage streaming tables. See Load data using streaming tables in Databricks SQL.
Warning
COPY INTO
respects the workspace setting for deletion vectors. If enabled, deletion vectors are enabled on the target table when COPY INTO
runs on a SQL warehouse or compute running Databricks Runtime 14.0 or above. Once enabled, deletion vectors block queries against a table in Databricks Runtime 11.3 LTS and below. See What are deletion vectors? and Auto-enable deletion vectors.
Requirements
An account admin must follow the steps in Configure data access for ingestion to configure access to data in cloud object storage before users can load data using COPY INTO
.
Example: Load data into a schemaless Delta Lake table
Note
This feature is available in Databricks Runtime 11.3 LTS and above.
You can create empty placeholder Delta tables so that the schema is later inferred during a COPY INTO
command by setting mergeSchema
to true
in COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
The SQL statement above is idempotent and can be scheduled to run to ingest data exactly-once into a Delta table.
Note
The empty Delta table is not usable outside of COPY INTO
. INSERT INTO
and MERGE INTO
are not supported to write data into schemaless Delta tables. After data is inserted into the table with COPY INTO
, the table becomes queryable.
See Create target tables for COPY INTO.
Example: Set schema and load data into a Delta Lake table
The following example shows how to create a Delta table and then use the COPY INTO
SQL command to load sample data from Databricks datasets into the table. You can run the example Python, R, Scala, or SQL code from a notebook attached to an Azure Databricks cluster. You can also run the SQL code from a query associated with a SQL warehouse in Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
To clean up, run the following code, which deletes the table:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Clean up metadata files
You can run VACUUM to clean up unreferenced metadata files created by COPY INTO
in Databricks Runtime 15.2 and above.
Reference
- Databricks Runtime 7.x and above: COPY INTO
Additional resources
Load data using COPY INTO with Unity Catalog volumes or external locations
For common use patterns, including examples of multiple
COPY INTO
operations against the same Delta table, see Common data loading patterns using COPY INTO.