使用 COPY INTO
的常见数据加载模式
了解使用 COPY INTO
将数据从文件源加载到 Delta Lake 的常见模式。
有许多选项可用于使用 COPY INTO
。 还可以结合这些模式将临时凭据与 COPY INTO 一起使用。
有关所有选项的完整参考,请参阅 COPY INTO。
为 COPY INTO
创建目标表
COPY INTO
必须面向现有的 Delta 表。
CREATE TABLE IF NOT EXISTS my_table
[(col_1 col_1_type, col_2 col_2_type, ...)]
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
在 Databricks Runtime 11.3 LTS 及更高版本中,对于支持架构演变的格式,设置这些表的架构是可选的。 有关详细信息,请参阅使用 COPY INTO 的架构推理和演变。
使用 COPY INTO
加载 JSON 数据
以下示例将 Azure Data Lake Storage Gen2 (ADLS Gen2) 中的五个文件中的 JSON 数据加载到名为 my_json_data
的 Delta 表中。 必须在执行 COPY INTO
之前创建此表。 如果已从一个文件加载了任何数据,则不会为该文件重新加载数据。
COPY INTO my_json_data
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
FILEFORMAT = JSON
FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')
-- The second execution will not copy any data since the first command already loaded the data
COPY INTO my_json_data
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
FILEFORMAT = JSON
FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')
使用 COPY INTO
加载 Avro 数据
以下示例使用附加 SQL 表达式作为 SELECT
语句的一部分在 ADLS Gen2 中加载 Avro 数据。
COPY INTO my_delta_table
FROM (SELECT to_date(dt) dt, event as measurement, quantity::double
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
FILEFORMAT = AVRO
使用 COPY INTO
加载 CSV 文件
以下示例将 CSV 文件从 abfss://container@storageAccount.dfs.core.windows.net/base/path/folder1
下的 Azure Data Lake Storage Gen2 加载到 Delta 表中。
COPY INTO target_table
FROM (SELECT key, index, textData, 'constant_value'
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
FORMAT_OPTIONS('header' = 'true')
-- The example below loads CSV files without headers in ADLS Gen2 using COPY INTO.
-- By casting the data and renaming the columns, you can put the data in the schema you want
COPY INTO target_table
FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
使用 COPY INTO
的架构推理和演变
本部分提供了使用 COPY INTO
的常见架构推理和演变配置的示例。
语法
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('inferSchema' = 'true', `mergeSchema` = `true`)
COPY_OPTIONS ('mergeSchema' = 'true');
以下 FORMAT_OPTIONS
可用于通过 COPY INTO
自动推理输入架构:
inferSchema
:是推理所分析记录的数据类型,还是假定所有列都是StringType
类型的。mergeSchema
:是否跨多个源文件推理架构并合并每个源文件的架构。如果源文件具有相同的架构,Databricks 建议使用
mergeSchema
(FORMAT_OPTIONS
) 中false
的默认设置。
以下 COPY_OPTIONS
可用于通过 COPY INTO
演变目标架构:
mergeSchema
:是否根据输入架构演变目标 Delta 表的架构。如果输入架构和目标架构相同,则
mergeSchema
可以是false
(COPY_OPTIONS
中)。
推断和演变 CSV 架构
以下示例创建一个名为 my_pipe_data
的无架构 Delta 表,并加载带有标题的竖线分隔的 CSV。
mergeSchema
是 FORMAT_OPTIONS
中的 true
,因为输入文件可能有标题或分隔符差异。
CREATE TABLE IF NOT EXISTS my_pipe_data;
COPY INTO my_pipe_data
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
FILEFORMAT = CSV
FORMAT_OPTIONS ('mergeSchema' = 'true',
'delimiter' = '|',
'header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
加载数据时忽略损坏的文件
如果由于某些损坏问题而无法读取正在加载的数据,则可以通过在 FORMAT_OPTIONS
中将 ignoreCorruptFiles
设置为 true
来跳过这些文件。
COPY INTO
命令的结果返回由于 num_skipped_corrupt_files
列中损坏而跳过的文件数。 在 Delta 表上运行 DESCRIBE HISTORY
后,此指标也会显示在 numSkippedCorruptFiles
下的 operationMetrics
列中。
损坏的文件不会被 COPY INTO
跟踪,因此,如果修复损坏,则可以在后续运行中重新加载这些文件。 可以通过在 VALIDATE
模式下运行 COPY INTO
来查看哪些文件已损坏。
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
[VALIDATE ALL]
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true')
注意
ignoreCorruptFiles
在 Databricks Runtime 11.3 LTS 及更高版本中可用。