แชร์ผ่าน


Get started using COPY INTO to load data

You can use the COPY INTO SQL command to load data from a file location into a Delta table. COPY INTO is retriable and idempotent — files in the source location that have already been loaded are skipped on subsequent runs.

COPY INTO offers these capabilities:

  • Easily configurable file or folder filters from cloud storage, including S3, ADLS, ABFS, GCS, and Unity Catalog volumes.
  • Support for multiple source file formats: CSV, JSON, XML, Avro, ORC, Parquet, text, and binary files.
  • Exactly-once (idempotent) file processing by default.
  • Target table schema inference, mapping, merging, and evolution.

Note

For a more scalable and robust file ingestion experience, Databricks recommends that SQL users use streaming tables. For more information, see Streaming tables.

Warning

COPY INTO respects the workspace setting for deletion vectors. If enabled, deletion vectors are enabled on the target table when COPY INTO runs on a SQL warehouse or compute running Databricks Runtime 14.0 or above. After deletion vectors are enabled, they block queries against a table in Databricks Runtime 11.3 LTS and below. See Deletion vectors in Databricks and Auto-enable deletion vectors.

Before you begin

An account administrator must follow the steps in Configure data access for ingestion to configure access to data in cloud object storage before users can load data using COPY INTO.

Load data into a schemaless Delta Lake table

In Databricks Runtime 11.3 LTS and above, you can create empty placeholder Delta tables so that the schema is inferred during a COPY INTO command by setting mergeSchema to true in COPY_OPTIONS. The following example uses the Wanderbricks data set. Replace <catalog>, <schema>, and <volume> with a catalog, schema, and volume where you have CREATE TABLE permissions.

SQL

CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;

COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Python

table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
  " COPY_OPTIONS ('mergeSchema' = 'true')",
  sep = ""
))

Scala

val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

This SQL statement is idempotent. This means that you can schedule it to run repeatedly, and it will only load new data into your Delta table.

Note

The empty Delta table is not usable outside COPY INTO. INSERT INTO and MERGE INTO are not supported to write data into schemaless Delta tables. After data is inserted into the table with COPY INTO, the table becomes queryable.

See Create target tables for COPY INTO.

Set schema and load data into a Delta Lake table

The following example creates a Delta table and uses the COPY INTO SQL command to load sample data from the Wanderbricks data set into the table. The source files are JSON files stored in a Unity Catalog volume. You can run the example Python, R, Scala, or SQL code from a notebook attached to an Azure Databricks cluster. You can also run the SQL code from a query associated with a SQL warehouse in Databricks SQL. Replace <catalog>, <schema>, and <volume> with a catalog, schema, and volume where you have CREATE TABLE permissions.

SQL

DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;

CREATE TABLE <catalog>.<schema>.booking_updates_upload (
  booking_id BIGINT,
  user_id BIGINT,
  status STRING,
  total_amount DOUBLE
);

COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');

SELECT * FROM <catalog>.<schema>.booking_updates_upload;

Python

table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "booking_id BIGINT, " + \
  "user_id BIGINT, " + \
  "status STRING, " + \
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)

display(booking_updates_upload_data)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "booking_id BIGINT, ",
  "user_id BIGINT, ",
  "status STRING, ",
  "total_amount DOUBLE)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('multiLine' = 'true')",
  sep = ""
))

booking_updates_upload_data = tableToDF(table_name)

display(booking_updates_upload_data)

Scala

val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "booking_id BIGINT, " +
  "user_id BIGINT, " +
  "status STRING, " +
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

val booking_updates_upload_data = spark.table(table_name)

display(booking_updates_upload_data)

To clean up, run the following code to delete the example table.

SQL

DROP TABLE <catalog>.<schema>.booking_updates_upload

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

Clean up metadata files

You can run VACUUM to clean up unreferenced metadata files created by COPY INTO in Databricks Runtime 15.2 and above.

Additional resources

  • Databricks Runtime 7.x and above: COPY INTO