Introdução ao uso do COPY INTO para carregar dados
O COPY INTO
comando SQL permite carregar dados de um local de arquivo em uma tabela Delta. Esta é uma operação recuperável e idempotente; Os arquivos no local de origem que já foram carregados são ignorados.
COPY INTO
oferece os seguintes recursos:
- Filtros de arquivos ou diretórios facilmente configuráveis a partir do armazenamento em nuvem, incluindo volumes S3, ADLS Gen2, ABFS, GCS e Unity Catalog.
- Suporte para vários formatos de arquivo de origem: CSV, JSON, XML, Avro, ORC, Parquet, texto e arquivos binários
- Processamento de arquivo exatamente uma vez (idempotente) por padrão
- Inferência, mapeamento, fusão e evolução do esquema da tabela de destino
Nota
Para uma experiência de ingestão de arquivos mais escalável e robusta, o Databricks recomenda que os usuários do SQL aproveitem as tabelas de streaming. Consulte Carregar dados usando tabelas de streaming no Databricks SQL.
Aviso
COPY INTO
Respeita a configuração do espaço de trabalho para vetores de exclusão. Se habilitado, os vetores de exclusão são habilitados na tabela de destino quando COPY INTO
executados em um SQL warehouse ou computação executando o Databricks Runtime 14.0 ou superior. Uma vez ativados, os vetores de exclusão bloqueiam consultas em uma tabela no Databricks Runtime 11.3 LTS e inferior. Consulte O que são vetores de exclusão? e Vetores de exclusão de ativação automática.
Requisitos
Um administrador de conta deve seguir as etapas em Configurar o acesso a dados para ingestão para configurar o acesso aos dados no armazenamento de objetos na nuvem antes que os usuários possam carregar dados usando COPY INTO
o .
Exemplo: Carregar dados em uma tabela Delta Lake sem esquema
Nota
Esse recurso está disponível no Databricks Runtime 11.3 LTS e superior.
Você pode criar tabelas Delta de espaço reservado vazias para que o esquema seja posteriormente inferido durante um COPY INTO
comando definindo mergeSchema
como true
em COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
A instrução SQL acima é idempotente e pode ser programada para ser executada para ingerir dados exatamente uma vez em uma tabela Delta.
Nota
A tabela Delta vazia não é utilizável fora do COPY INTO
. INSERT INTO
e MERGE INTO
não são suportados para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO
, a tabela se torna consultável.
Consulte Criar tabelas de destino para COPY INTO.
Exemplo: Definir esquema e carregar dados em uma tabela Delta Lake
O exemplo a seguir mostra como criar uma tabela Delta e, em seguida, usar o COPY INTO
comando SQL para carregar dados de exemplo de conjuntos de dados Databricks na tabela. Você pode executar o exemplo de código Python, R, Scala ou SQL de um bloco de anotações anexado a um cluster do Azure Databricks. Você também pode executar o código SQL a partir de uma consulta associada a um armazém SQL no Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Para limpar, execute o seguinte código, que exclui a tabela:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Limpar arquivos de metadados
Você pode executar o VACUUM para limpar arquivos de metadados não referenciados criados pelo COPY INTO
Databricks Runtime 15.2 e superior.
Referência
- Databricks Runtime 7.x e superior: COPY INTO
Recursos adicionais
Carregue dados usando COPY INTO com volumes do Catálogo Unity ou locais externos
Para padrões de uso comuns, incluindo exemplos de várias
COPY INTO
operações na mesma tabela Delta, consulte Padrões comuns de carregamento de dados usando COPY INTO.