Mulai menggunakan COPY INTO untuk memuat data
Perintah SQL COPY INTO
memungkinkan Anda memuat data dari lokasi file ke dalam tabel Delta. Ini adalah operasi yang dapat dicoba kembali dan idempotent — file di lokasi sumber yang telah dimuat dilewati.
COPY INTO
menawarkan kemampuan berikut:
- Filter file atau direktori yang dapat dikonfigurasi dengan mudah dari penyimpanan cloud, termasuk volume S3, ADLS Gen2, ABFS, GCS, dan Unity Catalog.
- Dukungan untuk beberapa format file sumber: file CSV, JSON, XML, Avro, ORC, Parquet, teks, dan biner
- Pemrosesan file tepat sekali (idempotoen) secara default
- Inferensi skema tabel target, pemetaan, penggabungan, dan evolusi
Catatan
Untuk pengalaman penyerapan file yang lebih dapat diskalakan dan kuat, Databricks merekomendasikan agar pengguna SQL memanfaatkan tabel streaming. Lihat Memuat data menggunakan tabel streaming di Databricks SQL.
Peringatan
COPY INTO
menghormati pengaturan ruang kerja untuk vektor penghapusan. Jika diaktifkan, vektor penghapusan diaktifkan pada tabel target saat COPY INTO
berjalan pada gudang SQL atau komputasi yang menjalankan Databricks Runtime 14.0 atau lebih tinggi. Setelah diaktifkan, vektor penghapusan memblokir kueri terhadap tabel di Databricks Runtime 11.3 LTS dan di bawahnya. Lihat Apa itu vektor penghapusan? dan Vektor penghapusan aktifkan otomatis.
Persyaratan
Admin akun harus mengikuti langkah-langkah dalam Mengonfigurasi akses data untuk penyerapan guna mengonfigurasi akses ke data di penyimpanan objek cloud sebelum pengguna dapat memuat data menggunakan COPY INTO
.
Contoh: Memuat data ke dalam tabel Delta Lake tanpa skema
Catatan
Fitur ini tersedia di Databricks Runtime 11.3 LTS ke atas.
Anda dapat membuat tabel Delta tempat penampung kosong sehingga skema kemudian disimpulkan selama COPY INTO
perintah dengan mengatur mergeSchema
ke true
di COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Pernyataan SQL di atas bersifat idempogen dan dapat dijadwalkan untuk dijalankan untuk menyerap data tepat sekali ke dalam tabel Delta.
Catatan
Tabel Delta kosong tidak dapat digunakan di luar COPY INTO
. INSERT INTO
dan MERGE INTO
tidak didukung untuk menulis data ke dalam tabel Delta tanpa skema. Setelah data disisipkan ke dalam tabel dengan COPY INTO
, tabel menjadi dapat dikueri.
Lihat Membuat tabel target untuk COPY INTO.
Contoh: Mengatur skema dan memuat data ke dalam tabel Delta Lake
Contoh berikut menunjukkan cara membuat tabel Delta lalu menggunakan COPY INTO
perintah SQL untuk memuat data sampel dari himpunan data Databricks ke dalam tabel. Anda dapat menjalankan contoh kode Python, R, Scala, atau SQL dari notebook yang dilampirkan ke kluster Azure Databricks. Anda juga dapat menjalankan kode SQL dari kueri yang terkait dengan gudang SQL di Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Untuk membersihkan, jalankan kode berikut, yang menghapus tabel:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Referensi
- Runtime Bahasa Umum Databricks 7.x ke atas: VACUUM
Sumber Daya Tambahan:
Memuat data menggunakan COPY INTO dengan volume Katalog Unity atau lokasi eksternal
Untuk pola penggunaan umum, termasuk contoh beberapa
COPY INTO
operasi terhadap tabel Delta yang sama, lihat Pola pemuatan data umum menggunakan COPY INTO.
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk