教學課程:實作資料湖擷取模式以更新 Databricks 差異資料表

本教學課程說明如何在具有階層命名空間的儲存體帳戶中處理事件。

您將建置一個小型解決方案,讓使用者藉由上傳說明銷售訂單的逗號分隔值 (csv) 檔案,來填入 Databricks Delta 資料表。 您將在 Azure Databricks 中將事件方格訂用帳戶、Azure 函式和作業連結在一起,以建置此解決方案。

在此教學課程中,您需要:

  • 建立會呼叫 Azure 函式的事件方格訂用帳戶。
  • 建立 Azure 函式,以接收來自事件的通知,然後在 Azure Databricks 中執行作業。
  • 建立 Databricks 作業,以將客戶訂單插入至位於儲存體帳戶中的 Databricks Delta 資料表。

我們會以反向順序建置此解決方案,從 Azure Databricks 工作區開始。

必要條件

建立銷售訂單

首先,建立說明銷售訂單的 csv 檔案,然後將該檔案上傳至儲存體帳戶。 稍後您將使用此檔案中的資料來填入 Databricks Delta 資料表中的第一個資料列。

  1. 在 Azure 入口網站中,瀏覽至您的新儲存體帳戶。

  2. 選取 [儲存體瀏覽器]-> [Blob 容器]-> [新增容器] 並建立名為 [資料] 的新容器。

    Screenshot of creating a folder in storage browser.

  3. 在 [資料] 容器中,建立名為 [輸入] 的資料夾。

  4. 將下列文字貼到文字編輯器中。

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
    
  5. 將此檔案儲存到您的本機電腦,並將其命名為 data.csv

  6. 在 [儲存體瀏覽器] 中,將此檔案上傳至 [輸入] 資料夾。

在 Azure Databricks 中建立作業

本節中,您將執行下列工作:

  • 建立 Azure Databricks 工作區。
  • 建立筆記本。
  • 建立並填入 Databricks Delta 資料表。
  • 新增將資料列插入 Databricks Delta 資料表中的程式碼。
  • 建立作業。

建立 Azure Databricks 工作區

在本節中,您會使用 Azure 入口網站建立 Azure Databricks 工作區。

  1. 建立 Azure Databricks 工作區。 將工作區命名為 contoso-orders。 請參閱建立 Azure Databricks 工作區 (部分機器翻譯)。

  2. 建立叢集。 將叢集命名為 customer-order-cluster。 請參閱建立叢集 (部分機器翻譯)。

  3. 建立筆記本。 建立筆記本 configure-customer-table,然後選擇 Python 作為筆記本的預設語言。 請參閱建立筆記本 (部分機器翻譯)。

建立並填入 Databricks Delta 資料表

  1. 在您建立的 Notebook 中,將下列程式碼區塊複製並貼到第一個資料格中,但先不要執行此程式碼。

    請將此程式碼區塊中的 appIdpasswordtenant 預留位置值取代為您在執行本教學課程的必要條件時所收集到的值。

    dbutils.widgets.text('source_file', "", "Source File")
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token")
    
    adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/'
    inputPath = adlsPath + dbutils.widgets.get('source_file')
    customerTablePath = adlsPath + 'delta-tables/customers'
    

    此程式碼會建立名為 source_file 的小工具。 稍後您將建立一個 Azure 函式,以呼叫此程式碼並將檔案路徑傳至該小工具。 此程式碼也會向儲存體帳戶驗證您的服務主體,並建立一些您將在其他資料格中使用的變數。

    注意

    在生產環境設定中,請考慮將驗證金鑰儲存在 Azure Databricks 中。 然後,將查閱索引鍵新增至程式碼區塊,而不是驗證金鑰。

    例如,您可以使用此程式碼:spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>")),而不是使用下列程式碼:spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")

    完成此教學課程之後,請參閱 Azure Databricks 網站上的 Azure Data Lake Storage Gen2 一文,以檢視此方法的範例。

  2. SHIFT + ENTER 鍵以執行此區塊中的程式碼。

  3. 將下列程式碼區塊複製並貼到不同的資料格中,然後按 SHIFT + ENTER 鍵以執行此區塊中的程式碼。

    from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
    
    inputSchema = StructType([
    StructField("InvoiceNo", IntegerType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Country", StringType(), True)
    ])
    
    rawDataDF = (spark.read
     .option("header", "true")
     .schema(inputSchema)
     .csv(adlsPath + 'input')
    )
    
    (rawDataDF.write
      .mode("overwrite")
      .format("delta")
      .saveAsTable("customer_data", path=customerTablePath))
    

    此程式碼會在您的儲存體帳戶中建立 Databricks Delta 資料表,然後從您先前上傳的 csv 檔案載入一些初始資料。

  4. 成功執行此程式碼區塊之後,請從您的 Notebook 中移除此程式碼區塊。

新增將資料列插入 Databricks Delta 資料表中的程式碼

  1. 將下列程式碼區塊複製並貼到不同的資料格中,但不要執行此資料格。

    upsertDataDF = (spark
      .read
      .option("header", "true")
      .csv(inputPath)
    )
    upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
    

    此程式碼會使用 csv 檔案中的資料,將資料插入暫存資料表檢視中。 該 csv 檔案的路徑來自於您在先前的步驟中建立的輸入小工具。

  2. 複製以下程式碼區塊並貼到不同資料格中。 此程式碼會合併暫存資料表檢視與 Databricks Delta 資料表的內容。

    %sql
    MERGE INTO customer_data cd
    USING customer_data_to_upsert cu
    ON cd.CustomerID = cu.CustomerID
    WHEN MATCHED THEN
      UPDATE SET
        cd.StockCode = cu.StockCode,
        cd.Description = cu.Description,
        cd.InvoiceNo = cu.InvoiceNo,
        cd.Quantity = cu.Quantity,
        cd.InvoiceDate = cu.InvoiceDate,
        cd.UnitPrice = cu.UnitPrice,
        cd.Country = cu.Country
    WHEN NOT MATCHED
      THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country)
      VALUES (
        cu.InvoiceNo,
        cu.StockCode,
        cu.Description,
        cu.Quantity,
        cu.InvoiceDate,
        cu.UnitPrice,
        cu.CustomerID,
        cu.Country)
    

建立作業

建立作業以執行您先前建立的 Notebook。 稍後您將建立會在事件引發時執行此作業的 Azure 函式。

  1. 選取 [新增]->[作業]

  2. 為作業指定名稱,然後選擇您建立的筆記本和叢集。 然後選取 [建立] 來建立作業。

建立 Azure 函式

建立執行作業的 Azure 函式。

  1. 在 Azure Databricks 工作區中,按一下頂端列中的 Azure Databricks 使用者名稱,然後從下拉式清單中選取 [使用者設定]

  2. 選取 [存取權杖] 索引標籤上的 [產生新權杖]

  3. 複製顯示的權杖,然後按一下 [完成]

  4. 在 [Databricks] 工作區的右上角選擇 [人員] 圖示,然後選擇 [使用者設定]

    Manage account

  5. 選取 [產生新權杖] 按鈕,然後選取 [產生] 按鈕。

    請確實將權杖複製到安全的位置。 您的 Azure 函式需要以此權杖向 Databricks 進行驗證,才能執行作業。

  6. 從 Azure 入口網站功能表或 [首頁] 頁面,選取 [建立資源]

  7. 在 [新增] 頁面中,選取 [計算]>[函數應用程式]

  8. 在 [建立函式應用程式] 頁面的 [基本] 索引標籤中,選擇資源群組,然後變更或確認下列設定:

    設定
    函數應用程式名稱 contosoorder
    執行階段堆疊 .NET
    發佈 代碼
    作業系統 Windows
    方案類型 使用量 (無伺服器)
  9. 選取 [檢閱 + 建立],然後選取 [建立]。

    部署完成後,選取 [移至資源] 以開啟函式應用程式的概觀頁面。

  10. 在 [設定] 群組中,選取 [設定]

  11. 在 [應用程式設定] 頁面中,選擇 [新增應用程式設定] 按鈕以新增每項設定。

    Add configuration setting

    新增下列設定:

    設定名稱
    DBX_INSTANCE Databricks 工作區的區域。 例如:westus2.azuredatabricks.net
    DBX_PAT 您先前產生的個人存取權杖。
    DBX_JOB_ID 執行中作業的識別碼。
  12. 選取 [儲存] 以認可這些設定。

  13. 在 [函式] 群組中,選取 [函式],然後再選取 [建立]

  14. 選擇 [Azure 事件方格觸發程序]

    如果系統提示您安裝 Microsoft.Azure.WebJobs.Extensions.EventGrid 延伸模組,請加以安裝。 如果需要安裝,您必須再次選擇 [Azure 事件方格觸發程序] 來建立函式。

    [新增函式] 窗格隨即出現。

  15. 在 [新增函式] 窗格中,將函式命名為 [UpsertOrder],然後選取 [建立] 按鈕。

  16. 將程式碼檔案的內容取代為此程式碼,然後選取 [儲存] 按鈕:

      #r "Azure.Messaging.EventGrid"
      #r "System.Memory.Data"
      #r "Newtonsoft.Json"
      #r "System.Text.Json"
      using Azure.Messaging.EventGrid;
      using Azure.Messaging.EventGrid.SystemEvents;
      using Newtonsoft.Json;
      using Newtonsoft.Json.Linq;
    
      private static HttpClient httpClient = new HttpClient();
    
      public static async Task Run(EventGridEvent eventGridEvent, ILogger log)
      {
         log.LogInformation("Event Subject: " + eventGridEvent.Subject);
         log.LogInformation("Event Topic: " + eventGridEvent.Topic);
         log.LogInformation("Event Type: " + eventGridEvent.EventType);
         log.LogInformation(eventGridEvent.Data.ToString());
    
         if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") {
            StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>();
            if (fileData.Api == "FlushWithClose") {
                  log.LogInformation("Triggering Databricks Job for file: " + fileData.Url);
                  var fileUrl = new Uri(fileData.Url);
                  var httpRequestMessage = new HttpRequestMessage {
                     Method = HttpMethod.Post,
                     RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))),
                     Headers = { 
                        { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)},
                        { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" }
                     },
                     Content = new StringContent(JsonConvert.SerializeObject(new {
                        job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process),
                        notebook_params = new {
                              source_file = String.Join("", fileUrl.Segments.Skip(2))
                        }
                     }))
                  };
                  var response = await httpClient.SendAsync(httpRequestMessage);
                  response.EnsureSuccessStatusCode();
            }
         }
      }
    

此程式碼會剖析所引發之儲存體事件的相關資訊,然後使用觸發事件之檔案的 URL 來建立要求訊息。 函式會將訊息中的值傳至您先前建立的 source_file 小工具。 函式程式碼會將訊息傳送至 Databricks 作業,並使用您先前取得的權杖作為驗證。

建立事件方格訂閱

在本節中,您將建立事件方格訂用帳戶,以在檔案上傳至儲存體帳戶時呼叫 Azure 函式。

  1. 選取 [整合],然後在 [整合] 頁面中,選取 [事件方格觸發程式]

  2. 在 [編輯觸發程式] 窗格中,將事件命名為 eventGridEvent,然後選取 [建立事件訂用帳戶]

    注意

    名稱 eventGridEvent 符合傳遞至 Azure 函式的參數。

  3. 在 [建立事件訂用帳戶] 頁面的 [基本] 索引標籤中,變更或確認下列設定:

    設定
    名稱 contoso-order-event-subscription
    主題類型 儲存體帳戶
    來源資源 contosoorders
    系統主題名稱 <create any name>
    篩選至事件類型 已建立 Blob 和已刪除 Blob
  4. 選取建立按鈕。

測試事件方格訂用帳戶

  1. 建立名為 customer-order.csv 的檔案,將下列資訊貼到該檔案中,並將檔案儲存到您的本機電腦。

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
    
  2. 在儲存體總管中,將此檔案上傳至儲存體帳戶的 input 資料夾。

    上傳檔案會引發 Microsoft.Storage.BlobCreated 事件。 事件方格會通知所有訂閱者有該事件。 在此案例中,Azure 函式是唯一的訂閱者。 Azure 函式會剖析事件參數,以判斷發生的事件。 然後,它會將檔案的 URL 傳至 Databricks 作業。 Databricks 作業會讀取檔案,並將資料列新增至位於您儲存體帳戶的 Databricks Delta 資料表。

  3. 若要檢查作業是否成功,請檢視作業的執行。 您會看到完成狀態。 如需如何檢視作業執行的詳細資訊,請參閱檢視作業的執行 (部分機器翻譯)

  4. 在新的活頁簿資料格中,在資料格中執行此查詢,以查看更新的差異資料表。

    %sql select * from customer_data
    

    傳回的資料表會顯示最新的記錄。

    Latest record appears in table

  5. 若要更新此記錄,請建立名為 customer-order-update.csv 的檔案,將下列資訊貼到該檔案中,並將檔案儲存到您的本機電腦。

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
    

    此 csv 檔案與前一個檔案幾乎完全相同,但訂單的數量已從 228 變更為 22

  6. 在儲存體總管中,將此檔案上傳至儲存體帳戶的 input 資料夾。

  7. 再次執行 select 查詢,以查看更新的差異資料表。

    %sql select * from customer_data
    

    傳回的資料表會顯示更新的記錄。

    Updated record appears in table

清除資源

當已不再需要資源時,請刪除資源群組及所有相關資源。 若要這麼做,請選取儲存體帳戶的資源群組,然後選取 [刪除]

下一步