Ismerkedés a COPY INTO használatával az adatok betöltéséhez
Az COPY INTO
SQL-paranccsal fájlhelyről tölthet be adatokat egy Delta-táblába. Ez egy újrabecsételhető és idempotens művelet; A program kihagyja a forráshelyen lévő, már betöltött fájlokat.
COPY INTO
a következő képességeket kínálja:
- Egyszerűen konfigurálható fájl- vagy címtárszűrők a felhőbeli tárolóból, beleértve az S3, az ADLS Gen2, az ABFS, a GCS és a Unity Catalog köteteket.
- Több forrásfájlformátum támogatása: CSV, JSON, XML, Avro, ORC, Parquet, text és bináris fájlok
- Pontosan egyszer (idempotens) fájlfeldolgozás alapértelmezés szerint
- Céltáblaséma következtetése, leképezése, egyesítése és fejlődése
Feljegyzés
A skálázhatóbb és robusztusabb fájlbetöltési élmény érdekében a Databricks azt javasolja, hogy az SQL-felhasználók használják a streamelési táblákat. Lásd: Adatok betöltése streamelési táblák használatával a Databricks SQL-ben.
Figyelmeztetés
COPY INTO
tiszteletben tartja a törlési vektorok munkaterület-beállítását. Ha engedélyezve van, a törlési vektorok engedélyezve vannak a céltáblán, amikor COPY INTO
SQL-raktáron fut, vagy a Databricks Runtime 14.0-s vagy újabb verzióját futtató számításokat futtatja. Ha engedélyezve van, a törlési vektorok blokkolják a lekérdezéseket a Databricks Runtime 11.3 LTS-ben és az alábbi táblázatban. Lásd: Mik azok a törlési vektorok? és a törlési vektorok automatikus engedélyezése.
Követelmények
A fiókadminisztrátornak követnie kell az adathozzáférés konfigurálása a felhőobjektum-tárolóban lévő adatokhoz való hozzáférés konfigurálásához, mielőtt a felhasználók betölthetik az adatokatCOPY INTO
.
Példa: Adatok betöltése séma nélküli Delta Lake-táblába
Feljegyzés
Ez a funkció a Databricks Runtime 11.3 LTS és újabb verziókban érhető el.
Létrehozhat üres helyőrző Delta-táblákat, hogy a séma később következtethető legyen a COPY INTO
parancs során a következő beállítással true
mergeSchema
COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
A fenti SQL-utasítás idempotens, és ütemezhető úgy, hogy az adatokat pontosan egyszer betöltse egy Delta-táblába.
Feljegyzés
Az üres Delta-tábla kívül nem használható COPY INTO
. INSERT INTO
és MERGE INTO
nem támogatott adatok séma nélküli Delta-táblákba való írása. Miután beszúrta az adatokat a táblába COPY INTO
, a tábla lekérdezhetővé válik.
Lásd: Céltáblák létrehozása a COPY INTO fájlhoz.
Példa: Séma beállítása és adatok betöltése Delta Lake-táblába
Az alábbi példa bemutatja, hogyan hozhat létre Delta-táblát, majd hogyan tölthet be mintaadatokat a Databricks-adathalmazokból a táblába az COPY INTO
SQL-paranccsal. Egy Azure Databricks-fürthöz csatolt jegyzetfüzetből futtathatja a Python, R, Scala vagy SQL-példakódot. Az SQL-kódot egy SQL-raktárhoz társított lekérdezésből is futtathatja a Databricks SQL-ben.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
A törléshez futtassa a következő kódot, amely törli a táblát:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Metaadatfájlok törlése
A VACUUM futtatásával törölheti a Databricks Runtime 15.2-ben és újabb verziókban létrehozott COPY INTO
nem hivatkozott metaadatfájlokat.
Referencia
- Databricks Runtime 7.x és újabb verziók: COPY INTO
További erőforrások
Adatok betöltése a COPY INTO használatával Unity Catalog-kötetekkel vagy külső helyekkel
A gyakori használati minták, köztük az ugyanazon a Delta-táblán végzett több
COPY INTO
művelet példáiért lásd : Gyakori adatbetöltési minták a COPY INTO használatával.