Udostępnij za pośrednictwem


Zacznij ładować dane używając COPY INTO

Za pomocą COPY INTO polecenia SQL można załadować dane z lokalizacji pliku do tabeli delty. COPY INTO jest powtarzalny i idempotentny — pliki w lokalizacji źródłowej, które zostały już załadowane, są pomijane przy kolejnych uruchomieniach.

COPY INTO oferuje następujące możliwości:

  • Łatwe konfigurowanie filtrów plików lub folderów z magazynu w chmurze, w tym woluminów S3, ADLS, ABFS, GCS i Unity Catalog.
  • Obsługa wielu formatów plików źródłowych: CSV, JSON, XML, Avro, ORC, Parquet, text i binarnych.
  • Przetwarzanie plików dokładnie raz (idempotentne) domyślnie.
  • Wnioskowanie, mapowanie, scalanie i ewolucja schematu tabeli docelowej.

Uwaga

Aby uzyskać bardziej skalowalny i niezawodny proces pozyskiwania plików, Databricks zaleca użytkownikom SQL używanie tabel przesyłania strumieniowego. Aby uzyskać więcej informacji, zobacz Tabele przesyłania strumieniowego.

Ostrzeżenie

COPY INTO uwzględnia ustawienia obszaru roboczego dotyczące wektorów usuwania. Jeśli to ustawienie jest włączone, wektory usuwania są włączone w tabeli docelowej, gdy COPY INTO działa na magazynie SQL lub zasobach obliczeniowych z uruchomionym środowiskiem Databricks Runtime 14.0 lub nowszym. Po włączeniu wektorów usuwania blokują zapytania względem tabeli w środowisku Databricks Runtime 11.3 LTS i poniżej. pl-PL: Zobacz Wektory usuwania w usłudze Databricks i Automatyczne włączanie wektorów usuwania.

Zanim rozpoczniesz

Administrator konta musi wykonać kroki opisane w temacie Konfigurowanie dostępu do danych na potrzeby pozyskiwania w celu skonfigurowania dostępu do danych w magazynie obiektów w chmurze, zanim użytkownicy będą mogli ładować dane przy użyciu polecenia COPY INTO.

Ładowanie danych do tabeli usługi Delta Lake bez schematu

W środowisku Databricks Runtime 11.3 LTS i nowszym można utworzyć puste tabele Delta jako symbole zastępcze, aby schemat był wnioskowany w trakcie polecenia COPY INTO, ustawiając mergeSchema na true w COPY_OPTIONS. W poniższym przykładzie użyto zestawu danych Wanderbricks. Zastąp <catalog>, <schema>, i <volume> katalogiem, schematem i woluminem, do których masz CREATE TABLE uprawnienia.

SQL

CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;

COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Python

table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
  " COPY_OPTIONS ('mergeSchema' = 'true')",
  sep = ""
))

Skala

val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

Ta instrukcja SQL jest idempotentna. Oznacza to, że można zaplanować jego uruchamianie wielokrotnie, a będzie ładować tylko nowe dane do tabeli Delta.

Uwaga

Pusta tabela delty nie może być używana poza COPY INTO. INSERT INTO i MERGE INTO nie są obsługiwane do zapisywania danych w tabelach delta bez schematu. Po wstawieniu danych do tabeli z COPY INTO, tabela staje się możliwa do zapytania.

Zobacz Tworzenie tabel docelowych dla elementu COPY INTO.

Ustawianie schematu i ładowanie danych do tabeli usługi Delta Lake

Poniższy przykład tworzy tabelę delty i używa COPY INTO polecenia SQL, aby załadować przykładowe dane z zestawu danych usługi Wanderbricks do tabeli. Pliki źródłowe to pliki JSON przechowywane w woluminie katalogu Unity. Możesz uruchomić przykładowy kod Python, R, Scala lub SQL z notesu dołączonego do klastra Azure Databricks. Możesz również uruchomić kod SQL z zapytania skojarzonego z usługą SQL Warehouse w usłudze Databricks SQL. Zastąp <catalog>, <schema> i <volume> odpowiednio katalogiem, schematem i woluminem, do których masz CREATE TABLE uprawnienia.

SQL

DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;

CREATE TABLE <catalog>.<schema>.booking_updates_upload (
  booking_id BIGINT,
  user_id BIGINT,
  status STRING,
  total_amount DOUBLE
);

COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');

SELECT * FROM <catalog>.<schema>.booking_updates_upload;

Python

table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "booking_id BIGINT, " + \
  "user_id BIGINT, " + \
  "status STRING, " + \
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)

display(booking_updates_upload_data)

R

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "booking_id BIGINT, ",
  "user_id BIGINT, ",
  "status STRING, ",
  "total_amount DOUBLE)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('multiLine' = 'true')",
  sep = ""
))

booking_updates_upload_data = tableToDF(table_name)

display(booking_updates_upload_data)

Skala

val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "booking_id BIGINT, " +
  "user_id BIGINT, " +
  "status STRING, " +
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

val booking_updates_upload_data = spark.table(table_name)

display(booking_updates_upload_data)

Aby wyczyścić, uruchom następujący kod, aby usunąć przykładową tabelę.

SQL

DROP TABLE <catalog>.<schema>.booking_updates_upload

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Skala

spark.sql("DROP TABLE " + table_name)

Czyszczenie plików metadanych

Możesz uruchomić VACUUM, aby wyczyścić nieużywane pliki metadanych utworzone przez COPY INTO w środowisku Databricks Runtime 15.2 lub nowszym.

Dodatkowe zasoby

  • Databricks Runtime 7.x oraz nowsze: COPY INTO