Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
O comando SQL COPY INTO
permite carregar dados de um local de arquivo para uma tabela do Delta. Essa é uma operação repetível e idempotente. Os arquivos no local de origem que já foram carregados são ignorados.
COPY INTO
oferece as seguintes funcionalidades:
- Filtros de arquivos ou diretórios facilmente configuráveis do armazenamento em nuvem, incluindo volumes S3, ADLS, ABFS, GCS e Unity Catalog.
- Suporte para vários formatos de arquivo de origem: CSV, JSON, XML, Avro, ORC, Parquet, texto e arquivos binários
- Processamento de arquivos exatamente uma vez (idempotente) por padrão
- Inferência do esquema da tabela de destino, mapeamento, mesclagem e evolução
Observação
Para obter uma experiência de ingestão de arquivos mais escalonável e robusta, a Databricks recomenda que os usuários de SQL aproveitem as tabelas de streaming. Consulte tabelas de streaming.
Aviso
COPY INTO
respeita a configuração do workspace para vetores de exclusão. Se habilitado, os vetores de exclusão serão ativados na tabela de destino quando COPY INTO
for executado em um warehouse SQL ou em uma computação que esteja rodando o Databricks Runtime 14.0 ou uma versão superior. Uma vez habilitados, os vetores de exclusão bloqueiam as consultas em uma tabela no Databricks Runtime 11.3 LTS e versões anteriores. Consulte O que são vetores de exclusão? e Habilitar automaticamente vetores de exclusão.
Requisitos
Um administrador de conta deve seguir as etapas em Configurar o acesso a dados para ingestão para configurar o acesso aos dados no armazenamento de objetos de nuvem antes que os usuários possam carregar dados usando COPY INTO
.
Exemplo: carregar dados em uma tabela do Delta Lake sem esquema
Observação
Este recurso está disponível no Databricks Runtime 11.3 LTS e versões superiores.
Você pode criar tabelas Delta de espaço reservado vazias para que o esquema seja posteriormente inferido durante um comando COPY INTO
definindo mergeSchema
como true
em COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
A instrução SQL acima é idempotente e pode ser agendada para ser executada para ingerir dados exatamente uma vez em uma tabela Delta.
Observação
A tabela Delta vazia não é utilizável fora de COPY INTO
.
INSERT INTO
e MERGE INTO
não há suporte para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO
, a tabela se torna consultável.
Veja Criar tabelas de destino para COPY INTO
.
Exemplo: definir o esquema e carregar dados em uma tabela do Delta Lake
O exemplo a seguir mostra como criar uma tabela Delta e usar o comando SQL COPY INTO
para carregar dados de exemplo dos conjuntos de dados do Databricks nela. Execute o código Python, R, Scala ou SQL de exemplo em um notebook anexado a um cluster do Azure Databricks. Também é possível executar o código SQL em uma consulta associada a um SQL warehouse no SQL do Databricks.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Pitão
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala (linguagem de programação)
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Para limpar, execute o seguinte código, que exclui a tabela:
Pitão
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala (linguagem de programação)
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Limpar arquivos de metadados
Você pode executar VACUUM para limpar arquivos de metadados não referenciados que foram criados pelo COPY INTO
no Databricks Runtime 15.2 e superior.
Referência
- Databricks Runtime 7.x e versões posteriores:
COPY INTO
Recursos adicionais
- Para padrões de uso comuns, incluindo exemplos de várias operações
COPY INTO
na mesma tabela Delta, veja Padrões comuns de carregamento de dados usandoCOPY INTO
.