Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Za pomocą COPY INTO polecenia SQL można załadować dane z lokalizacji pliku do tabeli delty.
COPY INTO jest powtarzalny i idempotentny — pliki w lokalizacji źródłowej, które zostały już załadowane, są pomijane przy kolejnych uruchomieniach.
COPY INTO oferuje następujące możliwości:
- Łatwe konfigurowanie filtrów plików lub folderów z magazynu w chmurze, w tym woluminów S3, ADLS, ABFS, GCS i Unity Catalog.
- Obsługa wielu formatów plików źródłowych: CSV, JSON, XML, Avro, ORC, Parquet, text i binarnych.
- Przetwarzanie plików dokładnie raz (idempotentne) domyślnie.
- Wnioskowanie, mapowanie, scalanie i ewolucja schematu tabeli docelowej.
Uwaga
Aby uzyskać bardziej skalowalny i niezawodny proces pozyskiwania plików, Databricks zaleca użytkownikom SQL używanie tabel przesyłania strumieniowego. Aby uzyskać więcej informacji, zobacz Tabele przesyłania strumieniowego.
Ostrzeżenie
COPY INTO uwzględnia ustawienia obszaru roboczego dotyczące wektorów usuwania. Jeśli to ustawienie jest włączone, wektory usuwania są włączone w tabeli docelowej, gdy COPY INTO działa na magazynie SQL lub zasobach obliczeniowych z uruchomionym środowiskiem Databricks Runtime 14.0 lub nowszym. Po włączeniu wektorów usuwania blokują zapytania względem tabeli w środowisku Databricks Runtime 11.3 LTS i poniżej. pl-PL: Zobacz Wektory usuwania w usłudze Databricks i Automatyczne włączanie wektorów usuwania.
Zanim rozpoczniesz
Administrator konta musi wykonać kroki opisane w temacie Konfigurowanie dostępu do danych na potrzeby pozyskiwania w celu skonfigurowania dostępu do danych w magazynie obiektów w chmurze, zanim użytkownicy będą mogli ładować dane przy użyciu polecenia COPY INTO.
Ładowanie danych do tabeli usługi Delta Lake bez schematu
W środowisku Databricks Runtime 11.3 LTS i nowszym można utworzyć puste tabele Delta jako symbole zastępcze, aby schemat był wnioskowany w trakcie polecenia COPY INTO, ustawiając mergeSchema na true w COPY_OPTIONS. W poniższym przykładzie użyto zestawu danych Wanderbricks. Zastąp <catalog>, <schema>, i <volume> katalogiem, schematem i woluminem, do których masz CREATE TABLE uprawnienia.
SQL
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;
COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Python
table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
" COPY_OPTIONS ('mergeSchema' = 'true')",
sep = ""
))
Skala
val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
Ta instrukcja SQL jest idempotentna. Oznacza to, że można zaplanować jego uruchamianie wielokrotnie, a będzie ładować tylko nowe dane do tabeli Delta.
Uwaga
Pusta tabela delty nie może być używana poza COPY INTO.
INSERT INTO i MERGE INTO nie są obsługiwane do zapisywania danych w tabelach delta bez schematu. Po wstawieniu danych do tabeli z COPY INTO, tabela staje się możliwa do zapytania.
Zobacz Tworzenie tabel docelowych dla elementu COPY INTO.
Ustawianie schematu i ładowanie danych do tabeli usługi Delta Lake
Poniższy przykład tworzy tabelę delty i używa COPY INTO polecenia SQL, aby załadować przykładowe dane z zestawu danych usługi Wanderbricks do tabeli. Pliki źródłowe to pliki JSON przechowywane w woluminie katalogu Unity. Możesz uruchomić przykładowy kod Python, R, Scala lub SQL z notesu dołączonego do klastra Azure Databricks. Możesz również uruchomić kod SQL z zapytania skojarzonego z usługą SQL Warehouse w usłudze Databricks SQL. Zastąp <catalog>, <schema> i <volume> odpowiednio katalogiem, schematem i woluminem, do których masz CREATE TABLE uprawnienia.
SQL
DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;
CREATE TABLE <catalog>.<schema>.booking_updates_upload (
booking_id BIGINT,
user_id BIGINT,
status STRING,
total_amount DOUBLE
);
COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');
SELECT * FROM <catalog>.<schema>.booking_updates_upload;
Python
table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"booking_id BIGINT, " + \
"user_id BIGINT, " + \
"status STRING, " + \
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)
display(booking_updates_upload_data)
R
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"booking_id BIGINT, ",
"user_id BIGINT, ",
"status STRING, ",
"total_amount DOUBLE)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('multiLine' = 'true')",
sep = ""
))
booking_updates_upload_data = tableToDF(table_name)
display(booking_updates_upload_data)
Skala
val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"booking_id BIGINT, " +
"user_id BIGINT, " +
"status STRING, " +
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
val booking_updates_upload_data = spark.table(table_name)
display(booking_updates_upload_data)
Aby wyczyścić, uruchom następujący kod, aby usunąć przykładową tabelę.
SQL
DROP TABLE <catalog>.<schema>.booking_updates_upload
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Skala
spark.sql("DROP TABLE " + table_name)
Czyszczenie plików metadanych
Możesz uruchomić VACUUM, aby wyczyścić nieużywane pliki metadanych utworzone przez COPY INTO w środowisku Databricks Runtime 15.2 lub nowszym.
Dodatkowe zasoby
- Databricks Runtime 7.x oraz nowsze:
COPY INTO