Partilhar via


Introdução ao uso do COPY INTO para carregar dados

O COPY INTO comando SQL permite carregar dados de um local de arquivo em uma tabela Delta. Esta é uma operação recuperável e idempotente; Os arquivos no local de origem que já foram carregados são ignorados.

COPY INTO oferece os seguintes recursos:

  • Filtros de arquivos ou diretórios facilmente configuráveis a partir do armazenamento em nuvem, incluindo volumes S3, ADLS Gen2, ABFS, GCS e Unity Catalog.
  • Suporte para vários formatos de arquivo de origem: CSV, JSON, XML, Avro, ORC, Parquet, texto e arquivos binários
  • Processamento de arquivo exatamente uma vez (idempotente) por padrão
  • Inferência, mapeamento, fusão e evolução do esquema da tabela de destino

Nota

Para uma experiência de ingestão de arquivos mais escalável e robusta, o Databricks recomenda que os usuários do SQL aproveitem as tabelas de streaming. Consulte Carregar dados usando tabelas de streaming no Databricks SQL.

Aviso

COPY INTO Respeita a configuração do espaço de trabalho para vetores de exclusão. Se habilitado, os vetores de exclusão são habilitados na tabela de destino quando COPY INTO executados em um SQL warehouse ou computação executando o Databricks Runtime 14.0 ou superior. Uma vez ativados, os vetores de exclusão bloqueiam consultas em uma tabela no Databricks Runtime 11.3 LTS e inferior. Consulte O que são vetores de exclusão? e Vetores de exclusão de ativação automática.

Requisitos

Um administrador de conta deve seguir as etapas em Configurar o acesso a dados para ingestão para configurar o acesso aos dados no armazenamento de objetos na nuvem antes que os usuários possam carregar dados usando COPY INTOo .

Exemplo: Carregar dados em uma tabela Delta Lake sem esquema

Nota

Esse recurso está disponível no Databricks Runtime 11.3 LTS e superior.

Você pode criar tabelas Delta de espaço reservado vazias para que o esquema seja posteriormente inferido durante um COPY INTO comando definindo mergeSchema como true em COPY_OPTIONS:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

A instrução SQL acima é idempotente e pode ser programada para ser executada para ingerir dados exatamente uma vez em uma tabela Delta.

Nota

A tabela Delta vazia não é utilizável fora do COPY INTO. INSERT INTO e MERGE INTO não são suportados para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO, a tabela se torna consultável.

Consulte Criar tabelas de destino para COPY INTO.

Exemplo: Definir esquema e carregar dados em uma tabela Delta Lake

O exemplo a seguir mostra como criar uma tabela Delta e, em seguida, usar o COPY INTO comando SQL para carregar dados de exemplo de conjuntos de dados Databricks na tabela. Você pode executar o exemplo de código Python, R, Scala ou SQL de um bloco de anotações anexado a um cluster do Azure Databricks. Você também pode executar o código SQL a partir de uma consulta associada a um armazém SQL no Databricks SQL.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Para limpar, execute o seguinte código, que exclui a tabela:

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

Limpar arquivos de metadados

Você pode executar o VACUUM para limpar arquivos de metadados não referenciados criados pelo COPY INTO Databricks Runtime 15.2 e superior.

Referência

  • Databricks Runtime 7.x e superior: COPY INTO

Recursos adicionais