Introduzione all'uso di COPY INTO per caricare i dati

Il COPY INTO comando SQL consente di caricare dati da un percorso di file in una tabella Delta. Si tratta di un'operazione riabilitabile e idempotente; i file nel percorso di origine che sono già stati caricati vengono ignorati.

COPY INTO offre le funzionalità seguenti:

  • Filtri di file o directory facilmente configurabili dall'archiviazione cloud, inclusi volumi S3, ADLS Gen2, ABFS, GCS e Unity Catalog.
  • Supporto per più formati di file di origine: CSV, JSON, XML, Avro, ORC, Parquet, text e file binari
  • Elaborazione di file di tipo exactly-once (idempotente) per impostazione predefinita
  • Inferenza dello schema della tabella di destinazione, mapping, unione ed evoluzione

Nota

Per un'esperienza di inserimento di file più scalabile e affidabile, Databricks consiglia agli utenti SQL di sfruttare le tabelle di streaming. Vedere Caricare dati usando tabelle di streaming in Databricks SQL.

Avviso

COPY INTO rispetta l'impostazione dell'area di lavoro per i vettori di eliminazione. Se abilitata, i vettori di eliminazione vengono abilitati nella tabella di destinazione quando COPY INTO vengono eseguiti in un'istanza di SQL Warehouse o nell'ambiente di calcolo che esegue Databricks Runtime 14.0 o versione successiva. Dopo l'abilitazione, i vettori di eliminazione bloccano le query su una tabella in Databricks Runtime 11.3 LTS e versioni successive. Vedere Che cosa sono i vettori di eliminazione? e Abilitare automaticamente i vettori di eliminazione.

Requisiti

Un amministratore dell'account deve seguire la procedura descritta in Configurare l'accesso ai dati per l'inserimento per configurare l'accesso ai dati nell'archiviazione di oggetti cloud prima che gli utenti possano caricare i dati usando COPY INTO.

Esempio: Caricare dati in una tabella Delta Lake senza schema

Nota

Questa funzionalità è disponibile in Databricks Runtime 11.3 LTS e versioni successive.

È possibile creare tabelle Delta segnaposto vuote in modo che lo schema venga dedotto in un secondo momento durante un COPY INTO comando impostando su mergeSchematrue in COPY_OPTIONS:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

L'istruzione SQL precedente è idempotente e può essere pianificata per l'esecuzione per inserire i dati esattamente una volta in una tabella Delta.

Nota

La tabella Delta vuota non è utilizzabile all'esterno di COPY INTO. INSERT INTO e MERGE INTO non sono supportati per scrivere dati in tabelle Delta senza schema. Dopo l'inserimento dei dati nella tabella con COPY INTO, la tabella diventa queryabile.

Vedere Creare tabelle di destinazione per COPY INTO.

Esempio: Impostare lo schema e caricare i dati in una tabella Delta Lake

L'esempio seguente illustra come creare una tabella Delta e quindi usare il COPY INTO comando SQL per caricare i dati di esempio dai set di dati di Databricks nella tabella. È possibile eseguire il codice Python, R, Scala o SQL di esempio da un notebook collegato a un cluster Azure Databricks. È anche possibile eseguire il codice SQL da una query associata a un'istanza di SQL Warehouse in Databricks SQL.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Per eseguire la pulizia, eseguire il codice seguente, che elimina la tabella:

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

Riferimento

  • Databricks Runtime 7.x e versioni successive: COPY INTO

Risorse aggiuntive