教學課程:COPY INTO with Spark SQL

Databricks 建議您針對包含數千個檔案的數據源,使用 COPY INTO 命令進行增量和大量數據載入。

在這個教學中,你會使用 COPY INTO 指令,將 Unity 目錄卷的 JSON 資料載入 Azure Databricks 工作區的 Delta 表格。 你使用 Wanderbricks 的範例資料集作為資料來源。 關於更進階的擷取應用,請參閱 「什麼是自動載入器?」。

要求

步驟 1:設定環境

這個教學中的程式碼使用 Unity 目錄卷來儲存 JSON 原始碼檔案。 請用具有<catalog>CREATE SCHEMA權限的目錄取代CREATE VOLUME。 如果你無法執行程式碼,請聯絡你的工作區管理員。

建立一個筆記本 ,並連接到運算資源。 接著執行以下程式碼,為這個教學設定架構和磁碟區。

Python

# Set parameters and reset demo environment

catalog = "<catalog>"

username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"

spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")

SQL

-- Reset demo environment

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;

步驟 2:將範例資料寫入磁碟區,格式為 JSON

COPY INTO 指令會從檔案來源載入資料。 從 Wanderbricksbookings 範例表讀取,並以 JSON 檔案寫入一批紀錄到你的磁碟區,模擬來自外部系統的資料。

Python

# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")

SQL

將檔案寫入磁碟區需要 Python。 在現實世界的工作流程中,這些資料會來自外部系統。

%python
# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

步驟 3:使用 COPY INTO 以等冪方式載入 JSON 數據

在使用 COPY INTO之前,先建立一個目標 Delta 表。 在你的 CREATE TABLE 陳述中,您只需提供一個表格名稱,不需要提供其他額外資訊。 由於此動作是冪等位,Databricks 即使執行多次程式碼,也只會載入一次資料。

Python

# Create target table and load data

spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")

spark.sql(f"""
  COPY INTO {catalog}.{schema}.bookings_target
  FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('mergeSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true')
""")

SQL

-- Create target table and load data

CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;

COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

步驟 4:預覽數據表的內容

確認表格包含從第一批取得的 20 筆 Wanderbricks 預訂資料,並檢查結構是否正確地從 JSON 原始檔案中推斷出來。

Python

# Review loaded data

display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))

SQL

-- Review loaded data

SELECT * FROM <catalog>.copy_into_tutorial.bookings_target

步驟 5:載入更多數據和預覽結果

你可以透過寫入另一批記錄並重新執行 COPY INTO ,模擬來自外部系統的額外資料。 執行以下程式碼來寫入第二批資料。

Python

# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")

SQL

將檔案寫入磁碟區需要 Python。 在現實世界的工作流程中,這些資料會來自外部系統。

%python
# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")

接著再執行 COPY INTO 步驟 3 的指令,預覽表格確認新紀錄。 只有新檔案會被載入。

Python

# Confirm new data was loaded

display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))

SQL

-- Confirm new data was loaded

SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target

步驟 6:整理教學課程

當您完成本教學課程時,如果您不想再保留相關聯的資源,則可以清除這些資源。 刪除綱要、資料表和磁碟區,並清除所有資料。

Python

# Drop schema and all associated objects

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")

SQL

-- Drop schema and all associated objects

DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;

其他資源