共用方式為


開始使用 COPY INTO 載入數據

COPY INTO SQL 命令可讓您將資料從檔案位置載入 Delta 資料表。 這是可重複使用且等冪的作業;已載入來源位置中的檔案會略過。

COPY INTO 提供下列功能:

  • 從雲端記憶體輕鬆設定的檔案或目錄篩選,包括 S3、ADLS Gen2、ABFS、GCS 和 Unity 目錄磁碟區。
  • 支援多種原始程式檔格式:CSV、JSON、XML、 AvroORCParquet、文字和二進位檔
  • 依預設,完全一次 (等冪) 檔案處理
  • 目標數據表架構推斷、對應、合併和演進

注意

如需更可調整且健全的檔案擷取體驗,Databricks 建議 SQL 使用者利用串流數據表。 請參閱 使用 Databricks SQL 中的串流數據表載入數據。

警告

COPY INTO 會遵守刪除向量的工作區設定。 如果啟用,在 SQL 倉儲上執行或執行 Databricks Runtime 14.0 或更新版本的計算時 COPY INTO ,會在目標數據表上啟用刪除向量。 啟用之後,刪除向量會封鎖 Databricks Runtime 11.3 LTS 和以下數據表的查詢。 請參閱 什麼是刪除向量?自動啟用刪除向量

需求

帳戶管理員必須遵循設定 數據存取以擷取 數據中的步驟,以設定雲端物件記憶體中數據的存取權,使用者才能使用 COPY INTO載入數據。

範例:將數據載入無架構的 Delta Lake 數據表

注意

這項功能適用於 Databricks Runtime 11.3 LTS 和更新版本。

您可以建立空的佔位元 Delta 資料表,以便稍後在 命令期間COPY INTO推斷架構,方法是在 中COPY_OPTIONS將 設定mergeSchematrue

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

上述 SQL 語句具有等冪性,而且可以排程執行,以將數據完全內嵌到 Delta 數據表。

注意

空的 Delta 資料表無法在 之外 COPY INTO使用。 INSERT INTOMERGE INTO和不支援將數據寫入無架構差異數據表。 使用 將數據插入數據表 COPY INTO之後,數據表就會變成可查詢。

請參閱 建立 COPY INTO 的目標數據表。

範例:設定架構並將數據載入 Delta Lake 數據表

下列範例示範如何建立 Delta 數據表,然後使用 COPY INTO SQL 命令,將 Databricks 數據集的 範例數據載入數據表。 您可以從連結至 Azure Databricks 叢集筆記本執行範例 Python、R、Scala 或 SQL 程式代碼。 您也可以從 Databricks SQL 中與 SQL 倉儲相關聯的查詢執行 SQL 程式代碼。

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

若要清除,請執行下列程式代碼,以刪除資料表:

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

參考

  • Databricks Runtime 7.x 和更新版本: COPY INTO

其他資源