在 Azure Cosmos DB 中使用大量執行程式 .NET 程式庫來執行大量作業

適用於:NoSQL

注意

本文中所述的大量執行工具程式庫是針對使用 .NET SDK 2.x 版的應用程式所維護。 針對新的應用程式,您可以使用 .NET SDK 3.x 版直接提供的「大量支援」,而不需要任何外部程式庫。

如果您目前使用大量執行工具程式庫,並規劃移轉至較新 SDK 上的大量支援,則請使用移轉指南中的步驟來移轉您的應用程式。

本教學課程說明如何使用大量執行工具 .NET 程式庫以將文件匯入和更新至 Azure Cosmos DB 容器。 若要了解大量執行工具程式庫,以及其如何協助您使用大量輸送量和儲存體,請參閱 Azure Cosmos DB 大量執行工具程式庫概觀。 在本教學課程中,您會看到範例 .NET 應用程式如何將隨機產生的文件大量匯入至 Azure Cosmos DB 容器。 匯入資料之後,程式庫會顯示如何將修補程式指定為可在特定文件欄位上執行的作業,來大量更新所匯入的資料。

目前,只有 Azure Cosmos DB for NoSQL 和 API for Gremlin 帳戶才支援大量執行程式庫。 本文說明如何搭配使用大量執行程式 .NET 程式庫與 API for NoSQL 帳戶。 若要了解如何搭配使用大量執行工具 .NET 程式庫與 API for Gremlin 帳戶,請參閱使用大量執行工具程式庫在 Azure Cosmos DB for Gremlin 中大量內嵌資料

必要條件

複製範例應用程式

現在我們會從 GitHub 下載 .NET 應用程式範例,來轉換為使用程式碼。 此應用程式會對 Azure Cosmos DB 帳戶中儲存的資料執行大量作業。 若要複製應用程式,請開啟命令提示字元,並導覽至要從其中進行複製的目錄,然後執行下列命令:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

複製的存放庫會包含兩個範例:BulkImportSampleBulkUpdateSample。 您可以開啟其中一個應用程式範例、使用 Azure Cosmos DB 帳戶的連接字串來更新 App.config 檔案中的連接字串、建置解決方案,並予以執行。

BulkImportSample 應用程式會產生隨機文件,並將其大量匯入至 Azure Cosmos DB 帳戶。 BulkUpdateSample 應用程式會將修補程式指定為可在特定文件欄位上執行的作業,來大量更新匯入的文件。 在後面幾節中,您將檢閱這裡每一個範例應用程式中的程式碼。

將資料大量匯入至 Azure Cosmos DB 帳戶

  1. 導覽至 BulkImportSample 資料夾,然後開啟 BulkImportSample.sln 檔案。

  2. Azure Cosmos DB 的連接字串是從 App.config 檔案中擷取,如下列程式碼所示:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    大量匯入工具會使用資料庫名稱、容器名稱以及 App.config 檔案中所指定的輸送量值來建立新的資料庫和容器。

  3. 接下來,DocumentClient 物件會使用直接 TCP 連線模式進行初始化:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. 針對等候時間和節流的要求,BulkExecutor 物件會使用高 retry 值進行初始化。 接著,這些值會設定為 0,如此會將壅塞控制傳遞給 BulkExecutor 以決定其存留期。

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. 應用程式會叫用 BulkImportAsync API。 .NET 程式庫會提供大量匯入 API 的兩個多載:一個接受一份已序列化的 JSON 文件清單,而另一個接受一份已還原序列化的 POCO 文件清單。 若要深入了解所有這些多載方法的定義,請參閱 API 文件

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    BulkImportAsync 方法可接受下列參數:

    參數 說明
    enableUpsert 在文件上啟用 upsert 作業的旗標。 如果具有所指定識別碼的文件已經存在,則會對其進行更新。 預設為 false。
    disableAutomaticIdGeneration 停用自動產生識別碼的旗標。 預設為 true。
    maxConcurrencyPerPartitionKeyRange 每個分割區索引鍵範圍的最大並行程度。 設定為 null 會導致程式庫使用預設值 20。
    maxInMemorySortingBatchSize 從每個階段傳遞至 API 呼叫的文件列舉程式中所提取的最大文件數。 針對大量匯入之前發生的記憶體內排序階段。 將此參數設定為 null 會導致程式庫使用預設最小值 (documents.count, 1000000)。
    cancellationToken 用於依正常程序結束大量匯入作業的取消權杖。

大量匯入回應物件定義
大量匯入 API 呼叫的結果包含下列屬性:

參數 說明
NumberOfDocumentsImported (long) 在提供給大量匯入 API 呼叫的文件總計中,成功匯入的文件總數。
TotalRequestUnitsConsumed (double) 大量匯入 API 呼叫取用的要求單位 (RU) 總數。
TotalTimeTaken (TimeSpan) 大量匯入 API 呼叫完成執行的時間總計。
BadInputDocuments (List<object>) 格式錯誤而未成功匯入大量匯入 API 呼叫的文件清單。 修正傳回的文件,然後重試匯入。 格式錯誤的文件包含其識別碼值不是字串 (Null 或任何其他視為無效的資料類型) 的文件。

大量更新 Azure Cosmos DB 帳戶中的資料

您可以使用 BulkUpdateAsync API 來更新現有的文件。 在此範例中,您會將 Name 欄位設定為新的值,並移除現有文件中的 Description 欄位。 如需完整的支援更新作業集,請參閱 API 文件

  1. 導覽至 BulkUpdateSample 資料夾,然後開啟 BulkUpdateSample.sln 檔案。

  2. 與相對應的欄位更新作業一起定義和更新項目。 在此範例中,您將使用 SetUpdateOperation 更新 Name 欄位,以及使用 UnsetUpdateOperation 移除所有文件中的 Description 欄位。 您也可以執行其他作業,例如透過特定值來遞增文件欄位、將特定值推送至陣列欄位,或移除陣列欄位中的特定值。 若要了解大量更新 API 提供的不同方法,請參閱 API 文件

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. 應用程式會叫用 BulkUpdateAsync API。 若要了解 BulkUpdateAsync 方法的定義,請參閱 API 文件

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    BulkUpdateAsync 方法可接受下列參數:

    參數 說明
    maxConcurrencyPerPartitionKeyRange 每個分割區索引鍵範圍的最大並行程度。 將此參數設定為 null 可讓程式庫使用預設值 (20)。
    maxInMemorySortingBatchSize 從每個階段傳遞至 API 呼叫的更新項目列舉程式中所提取的最大更新項目數。 針對大量更新之前發生的記憶體內排序階段。 將此參數設定為 null 會導致程式庫使用預設最小值 (updateItems.count, 1000000)。
    cancellationToken 用於依正常程序結束大量更新作業的取消權杖。

大量更新回應物件定義
大量更新 API 呼叫的結果包含下列屬性:

參數 說明
NumberOfDocumentsUpdated (long) 在提供給大量更新 API 呼叫的文件總計中,成功更新的文件數目。
TotalRequestUnitsConsumed (double) 大量更新 API 呼叫取用的要求單位 (RU) 總數。
TotalTimeTaken (TimeSpan) 大量更新 API 呼叫完成執行的時間總計。

效能提示

當您使用大量執行工具程式庫時,請考量下列各點以提升效能:

  • 為達到最佳效能,請從與 Azure Cosmos DB 帳戶寫入區域位於相同區域的 Azure 虛擬機器中執行應用程式。

  • 在對應至特定 Azure Cosmos DB 容器的單一虛擬機器內,建議您為整個應用程式具現化單一 BulkExecutor 物件。

  • 在內部繁衍多個工作時,單一大量作業 API 執行會取用大量用戶端機器的 CPU 和網路 IO。 請避免在執行大量作業 API 呼叫的應用程式程序內繁衍多個並行工作。 如果單一虛擬機器上執行的單一大量作業 API 呼叫無法取用整個容器的輸送量 (如果您容器的輸送量 > 1 百萬 RU/秒),則建議您建立個別虛擬機器來同時執行大量作業 API 呼叫。

  • 請確定在具現化 BulkExecutor 物件之後叫用 InitializeAsync() 方法,以擷取目標 Azure Cosmos DB 容器的分割區對應。

  • 在應用程式的 App.Config 中,為擁有最佳效能,請確保已啟用 gcServer

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • 程式庫會發出可收集到記錄檔或在主控台的追蹤。 若要兩者都啟用,請將下列程式碼新增至應用程式的 App.Config 檔案:

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

下一步