教學課程:使用 Spark SQL 複製 INTO
Databricks 建議您針對包含數千個檔案的數據源,使用 COPY INTO 命令進行增量和大量數據載入。 Databricks 建議您針對進階使用案例使用 自動載入器 。
在本教學課程中,您會使用 COPY INTO
命令,將數據從雲端物件記憶體載入 Azure Databricks 工作區中的數據表。
需求
- Azure 訂用帳戶、該訂用帳戶中的 Azure Databricks 工作區,以及該工作區中的叢集。 若要建立這些專案,請參閱快速入門:使用 Azure 入口網站 在 Azure Databricks 工作區上執行 Spark 作業。 如果您遵循本快速入門,就不需要遵循執行 Spark SQL 作業一節中的指示。
- 您工作區中執行 Databricks Runtime 11.3 LTS 或更新版本的所有用途 叢集 。 若要建立所有用途的叢集,請參閱 計算組態參考。
- 熟悉 Azure Databricks 工作區用戶介面。 請參閱 流覽工作區。
- 熟悉 Databricks 筆記本。
- 您可以寫入數據的位置;此示範使用 DBFS 根目錄作為範例,但 Databricks 建議使用 Unity 目錄設定的外部儲存位置。
步驟 1: 設定您的環境並建立數據產生器
本教學課程假設您已熟悉 Azure Databricks 和預設工作區設定。 如果您無法執行所提供的程式代碼,請連絡工作區管理員,以確定您可以存取計算資源,以及您可以寫入數據的位置。
請注意,所提供的程式代碼會使用 source
參數來指定您要設定為 COPY INTO
數據源的位置。 撰寫時,此程式代碼會指向 DBFS 根目錄上的位置。 如果您有外部物件儲存位置的寫入許可權,請將來源字串的部分取代 dbfs:/
為物件記憶體的路徑。 由於此程式代碼區塊也會執行遞歸刪除來重設此示範,因此請確定您不會將此數據指向實際執行數據,而且您保留 /user/{username}/copy-into-demo
巢狀目錄以避免覆寫或刪除現有的數據。
建立新的 SQL 筆記本 ,並將其 連結至執行 Databricks Runtime 11.3 LTS 或更新版本之叢集 。
複製並執行下列程式代碼,以重設本教學課程中使用的記憶體位置和資料庫:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
複製並執行下列程式代碼,以設定將用來隨機產生數據的一些數據表和函式:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
步驟 2:將範例數據寫入雲端記憶體
在 Azure Databricks 上,寫入 Delta Lake 以外的數據格式並不罕見。 此處提供的程式代碼會寫入 JSON,模擬可能會將另一個系統的結果傾印到物件記憶體的外部系統。
複製並執行下列程式代碼,以撰寫一批原始 JSON 數據:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
步驟 3:使用 COPY INTO 以等冪方式載入 JSON 數據
您必須先建立目標 Delta Lake 資料表,才能使用 COPY INTO
。 在 Databricks Runtime 11.3 LTS 和更新版本中,您不需要在 語句中 CREATE TABLE
提供數據表名稱以外的任何專案。 針對舊版的 Databricks Runtime,您必須在建立空白數據表時提供架構。
複製並執行下列程式代碼,以建立您的目標 Delta 數據表,並從來源載入資料:
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
因為此動作是等冪的,所以您可以多次執行,但數據只會載入一次。
步驟 4:預覽數據表的內容
您可以執行簡單的 SQL 查詢,以手動檢閱此資料表的內容。
複製並執行下列程式代碼來預覽您的資料表:
-- Review updated table SELECT * FROM user_ping_target
步驟 5:載入更多數據和預覽結果
您可以重新執行步驟 2-4 多次,以將新批次的隨機原始 JSON 數據放在來源中,使用 等冪方式將它們載入 Delta Lake COPY INTO
,並預覽結果。 請嘗試依序或多次執行這些步驟,以模擬寫入或執行 COPY INTO
多個批次的原始數據,而不需要到達新數據。
步驟 6:清除教學課程
當您完成本教學課程時,如果您不想再保留相關聯的資源,則可以清除這些資源。
複製並執行下列程式代碼來卸除資料庫、資料表和移除所有資料:
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
若要停止計算資源,請移至 [ 叢集] 索引標籤 和 [終止 叢集]。
其他資源
- COPY INTO 參考文章