Databricks 建議您針對包含數千個檔案的數據源,使用 COPY INTO 命令進行增量和大量數據載入。
在這個教學中,你會使用 COPY INTO 指令,將 Unity 目錄卷的 JSON 資料載入 Azure Databricks 工作區的 Delta 表格。 你使用 Wanderbricks 的範例資料集作為資料來源。 關於更進階的擷取應用,請參閱 「什麼是自動載入器?」。
要求
- 存取運算資源。 請參閱 計算。
- 一個支援 Unity 目錄的工作空間,擁有在目錄中建立結構與卷的權限。 請參閱 使用 Unity Catalog 連接到雲端物件存儲。
步驟 1:設定環境
這個教學中的程式碼使用 Unity 目錄卷來儲存 JSON 原始碼檔案。 請用具有<catalog>和CREATE SCHEMA權限的目錄取代CREATE VOLUME。 如果你無法執行程式碼,請聯絡你的工作區管理員。
建立一個筆記本 ,並連接到運算資源。 接著執行以下程式碼,為這個教學設定架構和磁碟區。
Python
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
SQL
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
步驟 2:將範例資料寫入磁碟區,格式為 JSON
此 COPY INTO 指令會從檔案來源載入資料。 從 Wanderbricksbookings 範例表讀取,並以 JSON 檔案寫入一批紀錄到你的磁碟區,模擬來自外部系統的資料。
Python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
SQL
將檔案寫入磁碟區需要 Python。 在現實世界的工作流程中,這些資料會來自外部系統。
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
步驟 3:使用 COPY INTO 以等冪方式載入 JSON 數據
在使用 COPY INTO之前,先建立一個目標 Delta 表。 在你的 CREATE TABLE 陳述中,您只需提供一個表格名稱,不需要提供其他額外資訊。 由於此動作是冪等位,Databricks 即使執行多次程式碼,也只會載入一次資料。
Python
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
SQL
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
步驟 4:預覽數據表的內容
確認表格包含從第一批取得的 20 筆 Wanderbricks 預訂資料,並檢查結構是否正確地從 JSON 原始檔案中推斷出來。
Python
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
SQL
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
步驟 5:載入更多數據和預覽結果
你可以透過寫入另一批記錄並重新執行 COPY INTO ,模擬來自外部系統的額外資料。 執行以下程式碼來寫入第二批資料。
Python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
SQL
將檔案寫入磁碟區需要 Python。 在現實世界的工作流程中,這些資料會來自外部系統。
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
接著再執行 COPY INTO 步驟 3 的指令,預覽表格確認新紀錄。 只有新檔案會被載入。
Python
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
SQL
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
步驟 6:整理教學課程
當您完成本教學課程時,如果您不想再保留相關聯的資源,則可以清除這些資源。 刪除綱要、資料表和磁碟區,並清除所有資料。
Python
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
SQL
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;