Veri yüklemek için COPY INTO kullanmaya başlama
COPY INTO
SQL komutu, bir dosya konumundan Delta tablosuna veri yüklemenize olanak tanır. Bu yeniden atanabilir ve bir kez etkili bir işlemdir; zaten yüklenmiş olan kaynak konumdaki dosyalar atlanır.
COPY INTO
aşağıdaki özellikleri sunar:
- S3, ADLS 2. Nesil, ABFS, GCS ve Unity Kataloğu birimleri dahil olmak üzere bulut depolamadan kolayca yapılandırılabilir dosya veya dizin filtreleri.
- Birden çok kaynak dosya biçimi desteği: CSV, JSON, XML, Avro, ORC, Parquet, metin ve ikili dosyalar
- Varsayılan olarak tam olarak bir kez (aynı anda) dosya işleme
- Hedef tablo şeması çıkarımı, eşleme, birleştirme ve evrim
Not
Daha ölçeklenebilir ve güçlü bir dosya alımı deneyimi için Databricks, SQL kullanıcılarının akış tablolarından yararlanmasını önerir. Bkz . Databricks SQL'de akış tablolarını kullanarak veri yükleme.
Uyarı
COPY INTO
silme vektörleri için çalışma alanı ayarını dikkate alır. Etkinleştirilirse, databricks Runtime 14.0 veya üzerini çalıştıran bir SQL ambarı veya işlem üzerinde çalıştırıldığında COPY INTO
hedef tabloda silme vektörleri etkinleştirilir. Silme vektörleri etkinleştirildikten sonra Databricks Runtime 11.3 LTS ve altındaki bir tabloya yönelik sorguları engeller. Bkz . Silme vektörleri nedir? ve Silme vektörlerini otomatik olarak etkinleştirme.
Gereksinimler
Hesap yöneticisinin, kullanıcıların kullanarak COPY INTO
veri yükleyebilmesi için önce bulut nesne depolamadaki verilere erişimi yapılandırmak için Veri alımı için veri erişimini yapılandırma bölümünde yer alan adımları izlemesi gerekir.
Örnek: Şemasız Delta Lake tablosuna veri yükleme
Not
Bu özellik Databricks Runtime 11.3 LTS ve üzerinde kullanılabilir.
boş yer tutucu Delta tabloları oluşturabilirsiniz; böylece şema daha sonra komutu sırasında COPY INTO
değerini olarak ayarlayarak mergeSchema
true
COPY_OPTIONS
çıkarılır:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Yukarıdaki SQL deyimi bir kez etkilidir ve verileri bir Delta tablosuna tam olarak bir kez almak için çalışacak şekilde zamanlanabilir.
Not
Boş Delta tablosu dışında COPY INTO
kullanılamaz. INSERT INTO
ve MERGE INTO
şemasız Delta tablolarına veri yazmak için desteklenmez. ile COPY INTO
tabloya veri eklendikten sonra tablo sorgulanabilir hale gelir.
Bkz . COPY INTO için hedef tablolar oluşturma.
Örnek: Şema ayarlama ve Delta Lake tablosuna veri yükleme
Aşağıdaki örnekte, Bir Delta tablosu oluşturma ve ardından SQL komutunu kullanarak COPY INTO
Databricks veri kümelerinden tabloya örnek veri yükleme adımları gösterilmektedir. Azure Databricks kümesine bağlı bir not defterinden Python, R, Scala veya SQL kodu örneğini çalıştırabilirsiniz. Sql kodunu Databricks SQL'deki bir SQL ambarıyla ilişkilendirilmiş bir sorgudan da çalıştırabilirsiniz.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Temizlemek için aşağıdaki kodu çalıştırarak tabloyu silin:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Meta veri dosyalarını temizleme
Tarafından Databricks Runtime 15.2 ve üzerinde oluşturulan COPY INTO
başvurulmayan meta veri dosyalarını temizlemek için VACUUM'yi çalıştırabilirsiniz.
Başvuru
- Databricks Runtime 7.x ve üzeri: COPY INTO
Ek kaynaklar
Unity Kataloğu birimleri veya dış konumlarla COPY INTO kullanarak veri yükleme
Aynı Delta tablosuna yönelik birden çok
COPY INTO
işlemin örnekleri de dahil olmak üzere yaygın kullanım düzenleri için bkz . COPY INTO kullanan yaygın veri yükleme desenleri.