หมายเหตุ
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลอง ลงชื่อเข้าใช้หรือเปลี่ยนไดเรกทอรีได้
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลองเปลี่ยนไดเรกทอรีได้
You can use the COPY INTO SQL command to load data from a file location into a Delta table. COPY INTO is retriable and idempotent — files in the source location that have already been loaded are skipped on subsequent runs.
COPY INTO offers these capabilities:
- Easily configurable file or folder filters from cloud storage, including S3, ADLS, ABFS, GCS, and Unity Catalog volumes.
- Support for multiple source file formats: CSV, JSON, XML, Avro, ORC, Parquet, text, and binary files.
- Exactly-once (idempotent) file processing by default.
- Target table schema inference, mapping, merging, and evolution.
Note
For a more scalable and robust file ingestion experience, Databricks recommends that SQL users use streaming tables. For more information, see Streaming tables.
Warning
COPY INTO respects the workspace setting for deletion vectors. If enabled, deletion vectors are enabled on the target table when COPY INTO runs on a SQL warehouse or compute running Databricks Runtime 14.0 or above. After deletion vectors are enabled, they block queries against a table in Databricks Runtime 11.3 LTS and below. See Deletion vectors in Databricks and Auto-enable deletion vectors.
Before you begin
An account administrator must follow the steps in Configure data access for ingestion to configure access to data in cloud object storage before users can load data using COPY INTO.
Load data into a schemaless Delta Lake table
In Databricks Runtime 11.3 LTS and above, you can create empty placeholder Delta tables so that the schema is inferred during a COPY INTO command by setting mergeSchema to true in COPY_OPTIONS. The following example uses the Wanderbricks data set. Replace <catalog>, <schema>, and <volume> with a catalog, schema, and volume where you have CREATE TABLE permissions.
SQL
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;
COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Python
table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
" COPY_OPTIONS ('mergeSchema' = 'true')",
sep = ""
))
Scala
val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
This SQL statement is idempotent. This means that you can schedule it to run repeatedly, and it will only load new data into your Delta table.
Note
The empty Delta table is not usable outside COPY INTO. INSERT INTO and MERGE INTO are not supported to write data into schemaless Delta tables. After data is inserted into the table with COPY INTO, the table becomes queryable.
See Create target tables for COPY INTO.
Set schema and load data into a Delta Lake table
The following example creates a Delta table and uses the COPY INTO SQL command to load sample data from the Wanderbricks data set into the table. The source files are JSON files stored in a Unity Catalog volume. You can run the example Python, R, Scala, or SQL code from a notebook attached to an Azure Databricks cluster. You can also run the SQL code from a query associated with a SQL warehouse in Databricks SQL. Replace <catalog>, <schema>, and <volume> with a catalog, schema, and volume where you have CREATE TABLE permissions.
SQL
DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;
CREATE TABLE <catalog>.<schema>.booking_updates_upload (
booking_id BIGINT,
user_id BIGINT,
status STRING,
total_amount DOUBLE
);
COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');
SELECT * FROM <catalog>.<schema>.booking_updates_upload;
Python
table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"booking_id BIGINT, " + \
"user_id BIGINT, " + \
"status STRING, " + \
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)
display(booking_updates_upload_data)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"booking_id BIGINT, ",
"user_id BIGINT, ",
"status STRING, ",
"total_amount DOUBLE)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('multiLine' = 'true')",
sep = ""
))
booking_updates_upload_data = tableToDF(table_name)
display(booking_updates_upload_data)
Scala
val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"booking_id BIGINT, " +
"user_id BIGINT, " +
"status STRING, " +
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
val booking_updates_upload_data = spark.table(table_name)
display(booking_updates_upload_data)
To clean up, run the following code to delete the example table.
SQL
DROP TABLE <catalog>.<schema>.booking_updates_upload
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
Clean up metadata files
You can run VACUUM to clean up unreferenced metadata files created by COPY INTO in Databricks Runtime 15.2 and above.
Additional resources
- Load data using COPY INTO with Unity Catalog volumes or external locations
- Common data loading patterns using
COPY INTO.
- Databricks Runtime 7.x and above:
COPY INTO