Начало работы с COPY INTO для загрузки данных
Команда SQL COPY INTO
позволяет загружать данные из расположения файлов в разностную таблицу. Это идемпотентная операция с возможностью повторных попыток — файлы в исходном расположении, которые уже были загружены, пропускаются.
COPY INTO
предлагает следующие возможности:
- Легко настраиваемые фильтры файлов или каталогов из облачного хранилища, в том числе S3, ADLS 2-го поколения, ABFS, GCS и каталогов Unity.
- Поддержка нескольких форматов исходных файлов: CSV, JSON, XML, Avro, ORC, Parquet, text и двоичных файлов
- По умолчанию обработка файлов в точности один раз (идемпотентная)
- Вывод целевой схемы таблицы, сопоставление, слияние и эволюция
Примечание.
Для более масштабируемого и надежного приема файлов Databricks рекомендует пользователям SQL использовать потоковые таблицы. См. сведения о загрузке данных с помощью потоковых таблиц в Databricks SQL.
Предупреждение
COPY INTO
учитывает параметр рабочей области для векторов удаления. Если этот параметр включен, векторы удаления включены в целевую таблицу при COPY INTO
выполнении в хранилище SQL или вычислении под управлением Databricks Runtime 14.0 или более поздней версии. После включения векторы удаления блокируют запросы к таблице в Databricks Runtime 11.3 LTS и ниже. См. векторы удаления и векторы автоматического включения удаления.
Требования
Администратор учетной записи должен выполнить действия, описанные в разделе "Настройка доступа к данным для приема данных", чтобы настроить доступ к данным в облачном хранилище объектов, прежде чем пользователи смогут загружать данные с помощью COPY INTO
.
Пример. Загрузка данных в таблицу Delta Lake без схемы
Примечание.
Эта функция доступна в Databricks Runtime 11.3 LTS и выше.
Вы можете создать пустые таблицы разностных заполнителей, чтобы схема была выведена позже во время COPY INTO
команды, задав mergeSchema
значение true
in COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Приведенная выше инструкция SQL является идемпотентной, и выполнить ее для приема данных в разностную таблицу можно только один раз.
Примечание.
Пустая разностная таблица недоступна для использования за пределами COPY INTO
. INSERT INTO
и MERGE INTO
не поддерживаются для записи данных в разностные таблицы без схемы. После вставки данных в таблицу с помощью COPY INTO
к таблице можно выполнять запросы.
См. статью "Создание целевых таблиц" для COPY INTO.
Пример. Настройка схемы и загрузка данных в таблицу Delta Lake
В следующем примере показано, как создать таблицу Delta, а затем использовать COPY INTO
команду SQL для загрузки примеров данных из наборов данных Databricks в таблицу. Примеры кода на языках Python, R, Scala и SQL можно запустить из записной книжки, подключенной к кластеру Azure Databricks. Вы также можете выполнить код SQL из запроса, связанного с хранилищем SQL, в Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Чтобы выполнить очистку, выполните следующий код, который удаляет таблицу.
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Очистка файлов метаданных
Для очистки файлов метаданных, созданных COPY INTO
в Databricks Runtime 15.2 и более поздних версий, можно запустить VACUUM.
Справочные материалы
- Databricks Runtime 7.x и более поздних версий: COPY INTO
Дополнительные ресурсы
Загрузка данных с помощью COPY INTO с томами каталога Unity или внешними расположениями
Распространенные шаблоны использования, включая примеры нескольких
COPY INTO
операций с одной и той же таблицей Delta, см. в общих шаблонах загрузки данных с помощью COPY INTO.