Поделиться через


Распространенные шаблоны загрузки данных с помощью COPY INTO

Узнайте распространенные шаблоны для загрузки COPY INTO данных из источников файлов в Delta Lake.

Существует множество вариантов использования COPY INTO. Вы также можете использовать временные учетные данные с COPY INTO в сочетании с этими шаблонами.

Все справочные материалы по всем вариантам см. в разделе COPY INTO.

Создание целевых таблиц для COPY INTO

Команда COPY INTO должна загружать данные в существующую разностную таблицу. В Databricks Runtime 11.3 LTS и более поздних версиях настройка схемы для этих таблиц является необязательной для форматов, поддерживающих эволюцию схемы:

CREATE TABLE IF NOT EXISTS my_table
[(col_1 col_1_type, col_2 col_2_type, ...)]
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

Обратите внимание, что для вывода схемы необходимо COPY INTOпередать дополнительные параметры:

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

В следующем примере создается разностная таблица my_pipe_data без схемы и загружается CSV-файл, разделенный символами вертикальной черты, с заголовком:

CREATE TABLE IF NOT EXISTS my_pipe_data;

COPY INTO my_pipe_data
  FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
  FILEFORMAT = CSV
  FORMAT_OPTIONS ('mergeSchema' = 'true',
                  'delimiter' = '|',
                  'header' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

Загрузка данных JSON с помощью COPY INTO

В следующем примере данные JSON загружаются из пяти файлов в Azure Data Lake Storage 2-го поколения (ADLS 2-го поколения) в таблицу my_json_dataDelta. Эту таблицу необходимо создать перед выполнением COPY INTO. Если из одного из файлов уже были загружены данные, эти данные для него перезагружены не будут.

COPY INTO my_json_data
  FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
  FILEFORMAT = JSON
  FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

 -- The second execution will not copy any data since the first command already loaded the data
 COPY INTO my_json_data
   FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
   FILEFORMAT = JSON
   FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

Загрузка данных Avro с помощью COPY INTO

В следующем примере данные Avro загружаются в ADLS 2-го поколения с помощью дополнительных выражений SQL в рамках инструкции SELECT .

COPY INTO my_delta_table
  FROM (SELECT to_date(dt) dt, event as measurement, quantity::double
          FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
  FILEFORMAT = AVRO

Загрузка CSV-файлов с помощью COPY INTO

В следующем примере csv-файлы загружаются из Azure Data Lake Storage 2-го поколения в abfss://container@storageAccount.dfs.core.windows.net/base/path/folder1 таблицу Delta.

COPY INTO target_table
  FROM (SELECT key, index, textData, 'constant_value'
          FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'
  FORMAT_OPTIONS('header' = 'true')

-- The example below loads CSV files without headers in ADLS Gen2 using COPY INTO.
-- By casting the data and renaming the columns, you can put the data in the schema you want
COPY INTO target_table
  FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
        FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'

Игнорирование поврежденных файлов при загрузке данных

Если загружаемые данные не удается прочитать в связи с повреждением, эти файлы можно пропустить, задав ignoreCorruptFilestrue в FORMAT_OPTIONS.

Результат выполнения команды COPY INTO возвращает количество файлов, пропущенных из-за повреждения в столбце num_skipped_corrupt_files. Эта метрика также отображается в столбце operationMetrics под заголовком numSkippedCorruptFiles после запуска DESCRIBE HISTORY в разностной таблице.

COPY INTO не отслеживает поврежденные файлы, поэтому, если повреждение исправлено, их можно перезагрузить в последующем запуске. Чтобы посмотреть, какие файлы повреждены, выполните команду COPY INTO в режиме VALIDATE.

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
[VALIDATE ALL]
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true')

Примечание.

ignoreCorruptFiles доступен в Databricks Runtime 11.3 LTS и выше.