Compartilhar via


Comece a usar COPY INTO para carregar dados

O comando SQL COPY INTO permite carregar dados de um local de arquivo para uma tabela do Delta. Essa é uma operação repetível e idempotente. Os arquivos no local de origem que já foram carregados são ignorados.

COPY INTO oferece as seguintes funcionalidades:

  • Filtros de arquivos ou diretórios facilmente configuráveis do armazenamento em nuvem, incluindo volumes S3, ADLS, ABFS, GCS e Unity Catalog.
  • Suporte para vários formatos de arquivo de origem: CSV, JSON, XML, Avro, ORC, Parquet, texto e arquivos binários
  • Processamento de arquivos exatamente uma vez (idempotente) por padrão
  • Inferência do esquema da tabela de destino, mapeamento, mesclagem e evolução

Observação

Para obter uma experiência de ingestão de arquivos mais escalonável e robusta, a Databricks recomenda que os usuários de SQL aproveitem as tabelas de streaming. Consulte tabelas de streaming.

Aviso

COPY INTO respeita a configuração do workspace para vetores de exclusão. Se habilitado, os vetores de exclusão serão ativados na tabela de destino quando COPY INTO for executado em um warehouse SQL ou em uma computação que esteja rodando o Databricks Runtime 14.0 ou uma versão superior. Uma vez habilitados, os vetores de exclusão bloqueiam as consultas em uma tabela no Databricks Runtime 11.3 LTS e versões anteriores. Consulte O que são vetores de exclusão? e Habilitar automaticamente vetores de exclusão.

Requisitos

Um administrador de conta deve seguir as etapas em Configurar o acesso a dados para ingestão para configurar o acesso aos dados no armazenamento de objetos de nuvem antes que os usuários possam carregar dados usando COPY INTO.

Exemplo: carregar dados em uma tabela do Delta Lake sem esquema

Observação

Este recurso está disponível no Databricks Runtime 11.3 LTS e versões superiores.

Você pode criar tabelas Delta de espaço reservado vazias para que o esquema seja posteriormente inferido durante um comando COPY INTO definindo mergeSchema como true em COPY_OPTIONS:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

A instrução SQL acima é idempotente e pode ser agendada para ser executada para ingerir dados exatamente uma vez em uma tabela Delta.

Observação

A tabela Delta vazia não é utilizável fora de COPY INTO. INSERT INTO e MERGE INTO não há suporte para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO, a tabela se torna consultável.

Veja Criar tabelas de destino para COPY INTO.

Exemplo: definir o esquema e carregar dados em uma tabela do Delta Lake

O exemplo a seguir mostra como criar uma tabela Delta e usar o comando SQL COPY INTO para carregar dados de exemplo dos conjuntos de dados do Databricks nela. Execute o código Python, R, Scala ou SQL de exemplo em um notebook anexado a um cluster do Azure Databricks. Também é possível executar o código SQL em uma consulta associada a um SQL warehouse no SQL do Databricks.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Pitão

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala (linguagem de programação)

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Para limpar, execute o seguinte código, que exclui a tabela:

Pitão

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala (linguagem de programação)

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

Limpar arquivos de metadados

Você pode executar VACUUM para limpar arquivos de metadados não referenciados que foram criados pelo COPY INTO no Databricks Runtime 15.2 e superior.

Referência

  • Databricks Runtime 7.x e versões posteriores: COPY INTO

Recursos adicionais