Udostępnij za pośrednictwem


Typowe wzorce ładowania danych przy użyciu funkcji COPY INTO

Poznaj typowe wzorce dotyczące ładowania COPY INTO danych ze źródeł plików do usługi Delta Lake.

Istnieje wiele opcji używania programu COPY INTO. Możesz również używać poświadczeń tymczasowych z funkcją COPY INTO w połączeniu z tymi wzorcami.

Zobacz COPY INTO , aby uzyskać pełną dokumentację wszystkich opcji.

Tworzenie tabel docelowych dla funkcji COPY INTO

COPY INTO musi być skierowana do istniejącej tabeli delty. W środowisku Databricks Runtime 11.3 LTS i nowszym ustawienie schematu dla tych tabel jest opcjonalne dla formatów obsługujących ewolucję schematu:

CREATE TABLE IF NOT EXISTS my_table
[(col_1 col_1_type, col_2 col_2_type, ...)]
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

Należy pamiętać, że aby wywnioskować schemat za pomocą COPY INTOpolecenia , należy przekazać dodatkowe opcje:

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Poniższy przykład tworzy tabelę delta bez schematu o nazwie my_pipe_data i ładuje rozdzielany potokowo wolumin CSV z nagłówkiem:

CREATE TABLE IF NOT EXISTS my_pipe_data;

COPY INTO my_pipe_data
  FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
  FILEFORMAT = CSV
  FORMAT_OPTIONS ('mergeSchema' = 'true',
                  'delimiter' = '|',
                  'header' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

Ładowanie danych JSON za pomocą funkcji COPY INTO

W poniższym przykładzie dane JSON są ładowane z pięciu plików w usłudze Azure Data Lake Storage Gen2 (ADLS Gen2) do tabeli delty o nazwie my_json_data. Przed wykonaniem tej tabeli należy utworzyć COPY INTO tę tabelę. Jeśli jakiekolwiek dane zostały już załadowane z jednego z plików, dane nie zostaną ponownie załadowane dla tego pliku.

COPY INTO my_json_data
  FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
  FILEFORMAT = JSON
  FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

 -- The second execution will not copy any data since the first command already loaded the data
 COPY INTO my_json_data
   FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path'
   FILEFORMAT = JSON
   FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

Ładowanie danych Avro za pomocą funkcji COPY INTO

Poniższy przykład ładuje dane Avro w usłudze ADLS Gen2 przy użyciu dodatkowych wyrażeń SQL w ramach instrukcji SELECT .

COPY INTO my_delta_table
  FROM (SELECT to_date(dt) dt, event as measurement, quantity::double
          FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
  FILEFORMAT = AVRO

Ładowanie plików CSV za pomocą funkcji COPY INTO

W poniższym przykładzie pliki CSV są ładowane z usługi Azure Data Lake Storage Gen2 w obszarze abfss://container@storageAccount.dfs.core.windows.net/base/path/folder1 do tabeli delty pod adresem abfss://container@storageAccount.dfs.core.windows.net/deltaTables/target.

COPY INTO delta.`abfss://container@storageAccount.dfs.core.windows.net/deltaTables/target`
  FROM (SELECT key, index, textData, 'constant_value'
          FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'
  FORMAT_OPTIONS('header' = 'true')

-- The example below loads CSV files without headers in ADLS Gen2 using COPY INTO.
-- By casting the data and renaming the columns, you can put the data in the schema you want
COPY INTO delta.`abfss://container@storageAccount.dfs.core.windows.net/deltaTables/target`
  FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
        FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'

Ignoruj uszkodzone pliki podczas ładowania danych

Jeśli ładowane dane nie mogą być odczytywane z powodu problemu z uszkodzeniem, te pliki można pominąć, ustawiając wartość na w pliku ignoreCorruptFilestrueFORMAT_OPTIONS.

Wynik COPY INTO polecenia zwraca liczbę pominiętych plików z powodu uszkodzenia w kolumnie num_skipped_corrupt_files . Ta metryka jest również wyświetlana w kolumnie poniżej numSkippedCorruptFiles po uruchomieniu operationMetricsDESCRIBE HISTORY w tabeli delty.

Uszkodzone pliki nie są śledzone przez COPY INTOprogram , więc można je ponownie załadować w kolejnym uruchomieniu, jeśli uszkodzenie zostało naprawione. Możesz zobaczyć, które pliki są uszkodzone, uruchamiając polecenie COPY INTO w VALIDATE trybie.

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
[VALIDATE ALL]
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true')

Uwaga

ignoreCorruptFiles Jest dostępny w środowisku Databricks Runtime 11.3 LTS i nowszym.