共用方式為


教學課程:使用 Spark SQL 複製 INTO

Databricks 建議您針對包含數千個檔案的數據源,使用 COPY INTO 命令進行增量和大量數據載入。 Databricks 建議您針對進階使用案例使用 自動載入器

在本教學課程中,您會使用 COPY INTO 命令,將數據從雲端物件記憶體載入 Azure Databricks 工作區中的數據表。

需求

  1. Azure 訂用帳戶、該訂用帳戶中的 Azure Databricks 工作區,以及該工作區中的叢集。 若要建立這些專案,請參閱快速入門:使用 Azure 入口網站 在 Azure Databricks 工作區上執行 Spark 作業。 如果您遵循本快速入門,就不需要遵循執行 Spark SQL 作業一節中的指示。
  2. 您工作區中執行 Databricks Runtime 11.3 LTS 或更新版本的所有用途 叢集 。 若要建立所有用途的叢集,請參閱 計算組態參考
  3. 熟悉 Azure Databricks 工作區用戶介面。 請參閱 流覽工作區
  4. 熟悉 Databricks 筆記本
  5. 您可以寫入數據的位置;此示範使用 DBFS 根目錄作為範例,但 Databricks 建議使用 Unity 目錄設定的外部儲存位置。

步驟 1: 設定您的環境並建立數據產生器

本教學課程假設您已熟悉 Azure Databricks 和預設工作區設定。 如果您無法執行所提供的程式代碼,請連絡工作區管理員,以確定您可以存取計算資源,以及您可以寫入數據的位置。

請注意,所提供的程式代碼會使用 source 參數來指定您要設定為 COPY INTO 數據源的位置。 撰寫時,此程式代碼會指向 DBFS 根目錄上的位置。 如果您有外部物件儲存位置的寫入許可權,請將來源字串的部分取代 dbfs:/ 為物件記憶體的路徑。 由於此程式代碼區塊也會執行遞歸刪除來重設此示範,因此請確定您不會將此數據指向實際執行數據,而且您保留 /user/{username}/copy-into-demo 巢狀目錄以避免覆寫或刪除現有的數據。

  1. 建立新的 SQL 筆記本 ,並將其 連結至執行 Databricks Runtime 11.3 LTS 或更新版本之叢集

  2. 複製並執行下列程式代碼,以重設本教學課程中使用的記憶體位置和資料庫:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. 複製並執行下列程式代碼,以設定將用來隨機產生數據的一些數據表和函式:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

步驟 2:將範例數據寫入雲端記憶體

在 Azure Databricks 上,寫入 Delta Lake 以外的數據格式並不罕見。 此處提供的程式代碼會寫入 JSON,模擬可能會將另一個系統的結果傾印到物件記憶體的外部系統。

  1. 複製並執行下列程式代碼,以撰寫一批原始 JSON 數據:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

步驟 3:使用 COPY INTO 以等冪方式載入 JSON 數據

您必須先建立目標 Delta Lake 資料表,才能使用 COPY INTO。 在 Databricks Runtime 11.3 LTS 和更新版本中,您不需要在 語句中 CREATE TABLE 提供數據表名稱以外的任何專案。 針對舊版的 Databricks Runtime,您必須在建立空白數據表時提供架構。

  1. 複製並執行下列程式代碼,以建立您的目標 Delta 數據表,並從來源載入資料:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

因為此動作是等冪的,所以您可以多次執行,但數據只會載入一次。

步驟 4:預覽數據表的內容

您可以執行簡單的 SQL 查詢,以手動檢閱此資料表的內容。

  1. 複製並執行下列程式代碼來預覽您的資料表:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

步驟 5:載入更多數據和預覽結果

您可以重新執行步驟 2-4 多次,以將新批次的隨機原始 JSON 數據放在來源中,使用 等冪方式將它們載入 Delta Lake COPY INTO,並預覽結果。 請嘗試依序或多次執行這些步驟,以模擬寫入或執行 COPY INTO 多個批次的原始數據,而不需要到達新數據。

步驟 6:清除教學課程

當您完成本教學課程時,如果您不想再保留相關聯的資源,則可以清除這些資源。

  1. 複製並執行下列程式代碼來卸除資料庫、資料表和移除所有資料:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. 若要停止計算資源,請移至 [ 叢集] 索引標籤[終止 叢集]。

其他資源