Introduzione all'uso di COPY INTO per caricare i dati
Il COPY INTO
comando SQL consente di caricare dati da un percorso di file in una tabella Delta. Si tratta di un'operazione riabilitabile e idempotente; i file nel percorso di origine che sono già stati caricati vengono ignorati.
COPY INTO
offre le funzionalità seguenti:
- Filtri di file o directory facilmente configurabili dall'archiviazione cloud, inclusi volumi S3, ADLS Gen2, ABFS, GCS e Unity Catalog.
- Supporto per più formati di file di origine: CSV, JSON, XML, Avro, ORC, Parquet, text e file binari
- Elaborazione di file di tipo exactly-once (idempotente) per impostazione predefinita
- Inferenza dello schema della tabella di destinazione, mapping, unione ed evoluzione
Nota
Per un'esperienza di inserimento di file più scalabile e affidabile, Databricks consiglia agli utenti SQL di sfruttare le tabelle di streaming. Vedere Caricare dati usando tabelle di streaming in Databricks SQL.
Avviso
COPY INTO
rispetta l'impostazione dell'area di lavoro per i vettori di eliminazione. Se abilitata, i vettori di eliminazione vengono abilitati nella tabella di destinazione quando COPY INTO
vengono eseguiti in un'istanza di SQL Warehouse o nell'ambiente di calcolo che esegue Databricks Runtime 14.0 o versione successiva. Dopo l'abilitazione, i vettori di eliminazione bloccano le query su una tabella in Databricks Runtime 11.3 LTS e versioni successive. Vedere Che cosa sono i vettori di eliminazione? e Abilitare automaticamente i vettori di eliminazione.
Requisiti
Un amministratore dell'account deve seguire la procedura descritta in Configurare l'accesso ai dati per l'inserimento per configurare l'accesso ai dati nell'archiviazione di oggetti cloud prima che gli utenti possano caricare i dati usando COPY INTO
.
Esempio: Caricare dati in una tabella Delta Lake senza schema
Nota
Questa funzionalità è disponibile in Databricks Runtime 11.3 LTS e versioni successive.
È possibile creare tabelle Delta segnaposto vuote in modo che lo schema venga dedotto in un secondo momento durante un COPY INTO
comando impostando su mergeSchema
true
in COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
L'istruzione SQL precedente è idempotente e può essere pianificata per l'esecuzione per inserire i dati esattamente una volta in una tabella Delta.
Nota
La tabella Delta vuota non è utilizzabile all'esterno di COPY INTO
. INSERT INTO
e MERGE INTO
non sono supportati per scrivere dati in tabelle Delta senza schema. Dopo l'inserimento dei dati nella tabella con COPY INTO
, la tabella diventa queryabile.
Vedere Creare tabelle di destinazione per COPY INTO.
Esempio: Impostare lo schema e caricare i dati in una tabella Delta Lake
L'esempio seguente illustra come creare una tabella Delta e quindi usare il COPY INTO
comando SQL per caricare i dati di esempio dai set di dati di Databricks nella tabella. È possibile eseguire il codice Python, R, Scala o SQL di esempio da un notebook collegato a un cluster Azure Databricks. È anche possibile eseguire il codice SQL da una query associata a un'istanza di SQL Warehouse in Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Per eseguire la pulizia, eseguire il codice seguente, che elimina la tabella:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Pulire i file di metadati
È possibile eseguire VACUUM per pulire i file di metadati non referenziati creati da COPY INTO
in Databricks Runtime 15.2 e versioni successive.
Riferimento
- Databricks Runtime 7.x e versioni successive: COPY INTO
Risorse aggiuntive
Caricare dati usando COPY INTO con volumi del catalogo Unity o posizioni esterne
Per i modelli di utilizzo comuni, inclusi esempi di più
COPY INTO
operazioni sulla stessa tabella Delta, vedere Modelli comuni di caricamento dei dati con COPY INTO.