Kom igång med COPY INTO för att läsa in data
Med COPY INTO
SQL-kommandot kan du läsa in data från en filplats till en Delta-tabell. Det här är en omtribar och idempotent åtgärd. filer på källplatsen som redan har lästs in hoppas över.
COPY INTO
erbjuder följande funktioner:
- Konfigurera enkelt fil- eller katalogfilter från molnlagring, inklusive volymer för S3, ADLS Gen2, ABFS, GCS och Unity Catalog.
- Stöd för flera källfilformat: CSV, JSON, XML, Avro, ORC, Parquet, text och binära filer
- Filbearbetning exakt en gång (idempotent) som standard
- Måltabellschemainferens, mappning, sammanslagning och utveckling
Kommentar
För en mer skalbar och robust filinmatning rekommenderar Databricks att SQL-användare använder strömningstabeller. Se Läsa in data med hjälp av strömmande tabeller i Databricks SQL.
Varning
COPY INTO
respekterar arbetsyteinställningen för borttagningsvektorer. Om det är aktiverat aktiveras borttagningsvektorer i måltabellen när COPY INTO
de körs på ett SQL-lager eller en beräkning som kör Databricks Runtime 14.0 eller senare. När det är aktiverat blockerar borttagningsvektorer frågor mot en tabell i Databricks Runtime 11.3 LTS och nedan. Se Vad är borttagningsvektorer? och Automatisk aktivering av borttagningsvektorer.
Krav
En kontoadministratör måste följa stegen i Konfigurera dataåtkomst för inmatning för att konfigurera åtkomst till data i molnobjektlagring innan användarna kan läsa in data med .COPY INTO
Exempel: Läsa in data i en schemalös Delta Lake-tabell
Kommentar
Den här funktionen är tillgänglig i Databricks Runtime 11.3 LTS och senare.
Du kan skapa tomma deltatabeller för platshållare så att schemat senare härleds under ett COPY INTO
kommando genom att ange mergeSchema
i true
COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
SQL-instruktionen ovan är idempotent och kan schemaläggas att köras för att mata in data exakt en gång i en Delta-tabell.
Kommentar
Den tomma Delta-tabellen kan inte användas utanför COPY INTO
. INSERT INTO
och MERGE INTO
stöds inte för att skriva data till schemalösa Delta-tabeller. När data har infogats i tabellen med COPY INTO
blir tabellen frågebar.
Se Skapa måltabeller för KOPIERA TILL.
Exempel: Ange schema och läs in data i en Delta Lake-tabell
I följande exempel visas hur du skapar en Delta-tabell och sedan använder COPY INTO
SQL-kommandot för att läsa in exempeldata från Databricks-datamängder i tabellen. Du kan köra python-, R-, Scala- eller SQL-exempelkoden från en notebook-fil som är kopplad till ett Azure Databricks-kluster. Du kan också köra SQL-koden från en fråga som är associerad med ett SQL-lager i Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Om du vill rensa kör du följande kod, som tar bort tabellen:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Rensa metadatafiler
Du kan köra VACUUM för att rensa ouppnåeliga metadatafiler som skapats av COPY INTO
i Databricks Runtime 15.2 och senare.
Referens
- Databricks Runtime 7.x och senare: KOPIERA TILL
Ytterligare resurser
Läsa in data med COPY INTO med Unity Catalog-volymer eller externa platser
Vanliga användningsmönster, inklusive exempel på flera
COPY INTO
åtgärder mot samma Delta-tabell, finns i Vanliga datainläsningsmönster med COPY INTO.