Udostępnij za pośrednictwem


Wprowadzenie do ładowania danych przy użyciu funkcji COPY INTO

Polecenie COPY INTO SQL umożliwia ładowanie danych z lokalizacji pliku do tabeli delty. Jest to operacja ponownego triable i idempotentnego; pliki w lokalizacji źródłowej, które zostały już załadowane, są pomijane.

COPY INTO oferuje następujące możliwości:

  • Łatwe konfigurowanie filtrów plików lub katalogów z magazynu w chmurze, w tym woluminów S3, ADLS Gen2, ABFS, GCS i Unity Catalog.
  • Obsługa wielu formatów plików źródłowych: CSV, JSON, XML, Avro, ORC, Parquet, text i binarnych
  • Przetwarzanie plików dokładnie raz (idempotentne) domyślnie
  • Wnioskowanie, mapowanie, scalanie i ewolucja schematu tabeli docelowej

Uwaga

Aby uzyskać bardziej skalowalne i niezawodne środowisko pozyskiwania plików, usługa Databricks zaleca użytkownikom usługi SQL korzystanie z tabel przesyłania strumieniowego. Zobacz Ładowanie danych przy użyciu tabel przesyłania strumieniowego w usłudze Databricks SQL.

Ostrzeżenie

COPY INTO uwzględnia ustawienie obszaru roboczego dla wektorów usuwania. Jeśli to ustawienie jest włączone, wektory usuwania są włączone w tabeli docelowej w przypadku COPY INTO uruchamiania w usłudze SQL Warehouse lub obliczeń z uruchomionym środowiskiem Databricks Runtime 14.0 lub nowszym. Po włączeniu wektory usuwania blokują zapytania względem tabeli w środowisku Databricks Runtime 11.3 LTS i poniżej. Zobacz Co to są wektory usuwania? i Automatyczne włączanie wektorów usuwania.

Wymagania

Administrator konta musi wykonać kroki opisane w temacie Konfigurowanie dostępu do danych na potrzeby pozyskiwania w celu skonfigurowania dostępu do danych w magazynie obiektów w chmurze, zanim użytkownicy będą mogli ładować dane przy użyciu polecenia COPY INTO.

Przykład: ładowanie danych do tabeli usługi Delta Lake bez schematu

Uwaga

Ta funkcja jest dostępna w środowisku Databricks Runtime 11.3 LTS lub nowszym.

Możesz utworzyć puste tabele różnicowe symbolu zastępczego, aby schemat został później wywnioskowany podczas polecenia, ustawiając wartość w elemecie COPY INTO true mergeSchema :COPY_OPTIONS

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Powyższa instrukcja SQL jest idempotentna i można zaplanować uruchamianie w celu pozyskiwania danych dokładnie raz w tabeli delty.

Uwaga

Pusta tabela delty nie może być dostępna poza elementem COPY INTO. INSERT INTO i MERGE INTO nie są obsługiwane do zapisywania danych w tabelach delta bez schematu. Po wstawieniu danych do tabeli za pomocą COPY INTOpolecenia tabela stanie się możliwe do wykonywania zapytań.

Zobacz Tworzenie tabel docelowych dla funkcji COPY INTO.

Przykład: ustawianie schematu i ładowanie danych do tabeli usługi Delta Lake

W poniższym przykładzie pokazano, jak utworzyć tabelę delty, a następnie użyć COPY INTO polecenia SQL, aby załadować przykładowe dane z zestawów danych usługi Databricks do tabeli. Możesz uruchomić przykładowy kod Python, R, Scala lub SQL z notesu dołączonego do klastra usługi Azure Databricks. Możesz również uruchomić kod SQL z zapytania skojarzonego z usługą SQL Warehouse w usłudze Databricks SQL.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Aby wyczyścić, uruchom następujący kod, który usuwa tabelę:

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

Czyszczenie plików metadanych

Możesz uruchomić program VACUUM , aby wyczyścić niereferencyjne pliki metadanych utworzone w COPY INTO środowisku Databricks Runtime 15.2 lub nowszym.

Odwołanie

Dodatkowe zasoby