Aracılığıyla paylaş


Veri yüklemek için COPY INTO kullanmaya başlama

COPY INTO SQL komutu, bir dosya konumundan Delta tablosuna veri yüklemenize olanak tanır. Bu yeniden atanabilir ve bir kez etkili bir işlemdir; zaten yüklenmiş olan kaynak konumdaki dosyalar atlanır.

COPY INTO aşağıdaki özellikleri sunar:

  • S3, ADLS 2. Nesil, ABFS, GCS ve Unity Kataloğu birimleri dahil olmak üzere bulut depolamadan kolayca yapılandırılabilir dosya veya dizin filtreleri.
  • Birden çok kaynak dosya biçimi desteği: CSV, JSON, XML, Avro, ORC, Parquet, metin ve ikili dosyalar
  • Varsayılan olarak tam olarak bir kez (aynı anda) dosya işleme
  • Hedef tablo şeması çıkarımı, eşleme, birleştirme ve evrim

Not

Daha ölçeklenebilir ve güçlü bir dosya alımı deneyimi için Databricks, SQL kullanıcılarının akış tablolarından yararlanmasını önerir. Bkz . Databricks SQL'de akış tablolarını kullanarak veri yükleme.

Uyarı

COPY INTO silme vektörleri için çalışma alanı ayarını dikkate alır. Etkinleştirilirse, databricks Runtime 14.0 veya üzerini çalıştıran bir SQL ambarı veya işlem üzerinde çalıştırıldığında COPY INTO hedef tabloda silme vektörleri etkinleştirilir. Silme vektörleri etkinleştirildikten sonra Databricks Runtime 11.3 LTS ve altındaki bir tabloya yönelik sorguları engeller. Bkz . Silme vektörleri nedir? ve Silme vektörlerini otomatik olarak etkinleştirme.

Gereksinimler

Hesap yöneticisinin, kullanıcıların kullanarak COPY INTOveri yükleyebilmesi için önce bulut nesne depolamadaki verilere erişimi yapılandırmak için Veri alımı için veri erişimini yapılandırma bölümünde yer alan adımları izlemesi gerekir.

Örnek: Şemasız Delta Lake tablosuna veri yükleme

Not

Bu özellik Databricks Runtime 11.3 LTS ve üzerinde kullanılabilir.

boş yer tutucu Delta tabloları oluşturabilirsiniz; böylece şema daha sonra komutu sırasında COPY INTO değerini olarak ayarlayarak mergeSchematrueCOPY_OPTIONSçıkarılır:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Yukarıdaki SQL deyimi bir kez etkilidir ve verileri bir Delta tablosuna tam olarak bir kez almak için çalışacak şekilde zamanlanabilir.

Not

Boş Delta tablosu dışında COPY INTOkullanılamaz. INSERT INTO ve MERGE INTO şemasız Delta tablolarına veri yazmak için desteklenmez. ile COPY INTOtabloya veri eklendikten sonra tablo sorgulanabilir hale gelir.

Bkz . COPY INTO için hedef tablolar oluşturma.

Örnek: Şema ayarlama ve Delta Lake tablosuna veri yükleme

Aşağıdaki örnekte, Bir Delta tablosu oluşturma ve ardından SQL komutunu kullanarak COPY INTO Databricks veri kümelerinden tabloya örnek veri yükleme adımları gösterilmektedir. Azure Databricks kümesine bağlı bir not defterinden Python, R, Scala veya SQL kodu örneğini çalıştırabilirsiniz. Sql kodunu Databricks SQL'deki bir SQL ambarıyla ilişkilendirilmiş bir sorgudan da çalıştırabilirsiniz.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Temizlemek için aşağıdaki kodu çalıştırarak tabloyu silin:

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

Başvuru

  • Databricks Runtime 7.x ve üzeri: COPY INTO

Ek kaynaklar