Wprowadzenie do ładowania danych przy użyciu funkcji COPY INTO
Polecenie COPY INTO
SQL umożliwia ładowanie danych z lokalizacji pliku do tabeli delty. Jest to operacja ponownego triable i idempotentnego; pliki w lokalizacji źródłowej, które zostały już załadowane, są pomijane.
COPY INTO
oferuje następujące możliwości:
- Łatwe konfigurowanie filtrów plików lub katalogów z magazynu w chmurze, w tym woluminów S3, ADLS Gen2, ABFS, GCS i Unity Catalog.
- Obsługa wielu formatów plików źródłowych: CSV, JSON, XML, Avro, ORC, Parquet, text i binarnych
- Przetwarzanie plików dokładnie raz (idempotentne) domyślnie
- Wnioskowanie, mapowanie, scalanie i ewolucja schematu tabeli docelowej
Uwaga
Aby uzyskać bardziej skalowalne i niezawodne środowisko pozyskiwania plików, usługa Databricks zaleca użytkownikom usługi SQL korzystanie z tabel przesyłania strumieniowego. Zobacz Ładowanie danych przy użyciu tabel przesyłania strumieniowego w usłudze Databricks SQL.
Ostrzeżenie
COPY INTO
uwzględnia ustawienie obszaru roboczego dla wektorów usuwania. Jeśli to ustawienie jest włączone, wektory usuwania są włączone w tabeli docelowej w przypadku COPY INTO
uruchamiania w usłudze SQL Warehouse lub obliczeń z uruchomionym środowiskiem Databricks Runtime 14.0 lub nowszym. Po włączeniu wektory usuwania blokują zapytania względem tabeli w środowisku Databricks Runtime 11.3 LTS i poniżej. Zobacz Co to są wektory usuwania? i Automatyczne włączanie wektorów usuwania.
Wymagania
Administrator konta musi wykonać kroki opisane w temacie Konfigurowanie dostępu do danych na potrzeby pozyskiwania w celu skonfigurowania dostępu do danych w magazynie obiektów w chmurze, zanim użytkownicy będą mogli ładować dane przy użyciu polecenia COPY INTO
.
Przykład: ładowanie danych do tabeli usługi Delta Lake bez schematu
Uwaga
Ta funkcja jest dostępna w środowisku Databricks Runtime 11.3 LTS lub nowszym.
Możesz utworzyć puste tabele różnicowe symbolu zastępczego, aby schemat został później wywnioskowany podczas polecenia, ustawiając wartość w elemecie COPY INTO
true
mergeSchema
:COPY_OPTIONS
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Powyższa instrukcja SQL jest idempotentna i można zaplanować uruchamianie w celu pozyskiwania danych dokładnie raz w tabeli delty.
Uwaga
Pusta tabela delty nie może być dostępna poza elementem COPY INTO
. INSERT INTO
i MERGE INTO
nie są obsługiwane do zapisywania danych w tabelach delta bez schematu. Po wstawieniu danych do tabeli za pomocą COPY INTO
polecenia tabela stanie się możliwe do wykonywania zapytań.
Zobacz Tworzenie tabel docelowych dla funkcji COPY INTO.
Przykład: ustawianie schematu i ładowanie danych do tabeli usługi Delta Lake
W poniższym przykładzie pokazano, jak utworzyć tabelę delty, a następnie użyć COPY INTO
polecenia SQL, aby załadować przykładowe dane z zestawów danych usługi Databricks do tabeli. Możesz uruchomić przykładowy kod Python, R, Scala lub SQL z notesu dołączonego do klastra usługi Azure Databricks. Możesz również uruchomić kod SQL z zapytania skojarzonego z usługą SQL Warehouse w usłudze Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Aby wyczyścić, uruchom następujący kod, który usuwa tabelę:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Czyszczenie plików metadanych
Możesz uruchomić program VACUUM , aby wyczyścić niereferencyjne pliki metadanych utworzone w COPY INTO
środowisku Databricks Runtime 15.2 lub nowszym.
Odwołanie
- Databricks Runtime 7.x i nowsze: COPY INTO
Dodatkowe zasoby
Ładowanie danych przy użyciu funkcji COPY INTO z jednostką usługi
Aby uzyskać typowe wzorce użycia, w tym przykłady wielu
COPY INTO
operacji względem tej samej tabeli delty, zobacz Typowe wzorce ładowania danych przy użyciu funkcji COPY INTO.