使用 COPY INTO 的常见数据加载模式

了解使用 COPY INTO 将数据从文件源加载到 Delta Lake 的常见模式。

有许多选项可用于使用 COPY INTO。 还可以结合这些模式将临时凭据与 COPY INTO 一起使用

有关所有选项的完整参考,请参阅 COPY INTO

COPY INTO 创建目标表

COPY INTO 必须面向现有的 Delta 表。

CREATE TABLE IF NOT EXISTS my_table
[(col_1 col_1_type, col_2 col_2_type, ...)]
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

在 Databricks Runtime 11.3 LTS 及更高版本中,对于支持架构演变的格式,设置这些表的架构是可选的。 有关详细信息,请参阅使用 COPY INTO 的架构推理和演变

使用 COPY INTO 加载 JSON 数据

以下示例将 Azure Data Lake Storage Gen2 (ADLS Gen2) 中的五个文件中的 JSON 数据加载到名为 my_json_data 的 Delta 表中。 必须在执行 COPY INTO 之前创建此表。 如果已从一个文件加载了任何数据,则不会为该文件重新加载数据。

COPY INTO my_json_data
  FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
  FILEFORMAT = JSON
  FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

 -- The second execution will not copy any data since the first command already loaded the data
 COPY INTO my_json_data
   FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
   FILEFORMAT = JSON
   FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

使用 COPY INTO 加载 Avro 数据

以下示例使用附加 SQL 表达式作为 SELECT 语句的一部分在 ADLS Gen2 中加载 Avro 数据。

COPY INTO my_delta_table
  FROM (SELECT to_date(dt) dt, event as measurement, quantity::double
          FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
  FILEFORMAT = AVRO

使用 COPY INTO 加载 CSV 文件

以下示例将 CSV 文件从 abfss://container@storageAccount.dfs.core.windows.net/base/path/folder1 下的 Azure Data Lake Storage Gen2 加载到 Delta 表中。

COPY INTO target_table
  FROM (SELECT key, index, textData, 'constant_value'
          FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'
  FORMAT_OPTIONS('header' = 'true')

-- The example below loads CSV files without headers in ADLS Gen2 using COPY INTO.
-- By casting the data and renaming the columns, you can put the data in the schema you want
COPY INTO target_table
  FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
        FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'

使用 COPY INTO 的架构推理和演变

本部分提供了使用 COPY INTO 的常见架构推理和演变配置的示例。

语法

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('inferSchema' = 'true', `mergeSchema` = `true`)
COPY_OPTIONS ('mergeSchema' = 'true');

以下 FORMAT_OPTIONS 可用于通过 COPY INTO 自动推理输入架构:

  • inferSchema:是推理所分析记录的数据类型,还是假定所有列都是 StringType 类型的。

  • mergeSchema:是否跨多个源文件推理架构并合并每个源文件的架构。

    如果源文件具有相同的架构,Databricks 建议使用 mergeSchema (FORMAT_OPTIONS) 中 false 的默认设置。

以下 COPY_OPTIONS 可用于通过 COPY INTO 演变目标架构:

  • mergeSchema:是否根据输入架构演变目标 Delta 表的架构。

    如果输入架构和目标架构相同,则 mergeSchema 可以是 falseCOPY_OPTIONS 中)。

推断和演变 CSV 架构

以下示例创建一个名为 my_pipe_data 的无架构 Delta 表,并加载带有标题的竖线分隔的 CSV。

mergeSchemaFORMAT_OPTIONS 中的 true,因为输入文件可能有标题或分隔符差异。

CREATE TABLE IF NOT EXISTS my_pipe_data;

COPY INTO my_pipe_data
  FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
  FILEFORMAT = CSV
  FORMAT_OPTIONS ('mergeSchema' = 'true',
                  'delimiter' = '|',
                  'header' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

加载数据时忽略损坏的文件

如果由于某些损坏问题而无法读取正在加载的数据,则可以通过在 FORMAT_OPTIONS 中将 ignoreCorruptFiles 设置为 true 来跳过这些文件。

COPY INTO 命令的结果返回由于 num_skipped_corrupt_files 列中损坏而跳过的文件数。 在 Delta 表上运行 DESCRIBE HISTORY 后,此指标也会显示在 numSkippedCorruptFiles 下的 operationMetrics 列中。

损坏的文件不会被 COPY INTO 跟踪,因此,如果修复损坏,则可以在后续运行中重新加载这些文件。 可以通过在 VALIDATE 模式下运行 COPY INTO 来查看哪些文件已损坏。

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
[VALIDATE ALL]
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true')

注意

ignoreCorruptFiles在 Databricks Runtime 11.3 LTS 及更高版本中可用。