Mulai menggunakan COPY INTO untuk memuat data

Perintah SQL COPY INTO memungkinkan Anda memuat data dari lokasi file ke dalam tabel Delta. Ini adalah operasi yang dapat dicoba kembali dan idempotent — file di lokasi sumber yang telah dimuat dilewati.

COPY INTO menawarkan kemampuan berikut:

  • Filter file atau direktori yang dapat dikonfigurasi dengan mudah dari penyimpanan cloud, termasuk volume S3, ADLS Gen2, ABFS, GCS, dan Unity Catalog.
  • Dukungan untuk beberapa format file sumber: file CSV, JSON, XML, Avro, ORC, Parquet, teks, dan biner
  • Pemrosesan file tepat sekali (idempotoen) secara default
  • Inferensi skema tabel target, pemetaan, penggabungan, dan evolusi

Catatan

Untuk pengalaman penyerapan file yang lebih dapat diskalakan dan kuat, Databricks merekomendasikan agar pengguna SQL memanfaatkan tabel streaming. Lihat Memuat data menggunakan tabel streaming di Databricks SQL.

Peringatan

COPY INTO menghormati pengaturan ruang kerja untuk vektor penghapusan. Jika diaktifkan, vektor penghapusan diaktifkan pada tabel target saat COPY INTO berjalan pada gudang SQL atau komputasi yang menjalankan Databricks Runtime 14.0 atau lebih tinggi. Setelah diaktifkan, vektor penghapusan memblokir kueri terhadap tabel di Databricks Runtime 11.3 LTS dan di bawahnya. Lihat Apa itu vektor penghapusan? dan Vektor penghapusan aktifkan otomatis.

Persyaratan

Admin akun harus mengikuti langkah-langkah dalam Mengonfigurasi akses data untuk penyerapan guna mengonfigurasi akses ke data di penyimpanan objek cloud sebelum pengguna dapat memuat data menggunakan COPY INTO.

Contoh: Memuat data ke dalam tabel Delta Lake tanpa skema

Catatan

Fitur ini tersedia di Databricks Runtime 11.3 LTS ke atas.

Anda dapat membuat tabel Delta tempat penampung kosong sehingga skema kemudian disimpulkan selama COPY INTO perintah dengan mengatur mergeSchema ke true di COPY_OPTIONS:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Pernyataan SQL di atas bersifat idempogen dan dapat dijadwalkan untuk dijalankan untuk menyerap data tepat sekali ke dalam tabel Delta.

Catatan

Tabel Delta kosong tidak dapat digunakan di luar COPY INTO. INSERT INTO dan MERGE INTO tidak didukung untuk menulis data ke dalam tabel Delta tanpa skema. Setelah data disisipkan ke dalam tabel dengan COPY INTO, tabel menjadi dapat dikueri.

Lihat Membuat tabel target untuk COPY INTO.

Contoh: Mengatur skema dan memuat data ke dalam tabel Delta Lake

Contoh berikut menunjukkan cara membuat tabel Delta lalu menggunakan COPY INTO perintah SQL untuk memuat data sampel dari himpunan data Databricks ke dalam tabel. Anda dapat menjalankan contoh kode Python, R, Scala, atau SQL dari notebook yang dilampirkan ke kluster Azure Databricks. Anda juga dapat menjalankan kode SQL dari kueri yang terkait dengan gudang SQL di Databricks SQL.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Untuk membersihkan, jalankan kode berikut, yang menghapus tabel:

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

Referensi

  • Runtime Bahasa Umum Databricks 7.x ke atas: VACUUM

Sumber Daya Tambahan: