Använda .NET-massexekutorbiblioteket för att utföra massåtgärder i Azure Cosmos DB

GÄLLER FÖR: NoSQL

Kommentar

Massexekutorbiblioteket som beskrivs i den här artikeln underhålls för program som använder .NET SDK 2.x-versionen. För nya program kan du använda massstöd som är direkt tillgängligt med .NET SDK version 3.x och det kräver inget externt bibliotek.

Om du för närvarande använder massexekutorbiblioteket och planerar att migrera till massstöd på den nyare SDK:n använder du stegen i migreringsguiden för att migrera ditt program.

Den här självstudien innehåller instruktioner om hur du använder .NET-biblioteket för masskörning för att importera och uppdatera dokument till en Azure Cosmos DB-container. Mer information om massexekutorbiblioteket och hur det hjälper dig att använda massivt dataflöde och lagring finns i översikten över massexekutorbiblioteket i Azure Cosmos DB. I den här självstudien visas ett .NET-exempelprogram där massimport av slumpmässigt genererade dokument till en Azure Cosmos DB-container. När du har importerat data visar biblioteket hur du kan massuppdatera importerade data genom att ange korrigeringar som åtgärder som ska utföras på specifika dokumentfält.

För närvarande stöds massexekutorbiblioteket endast av Azure Cosmos DB för NoSQL och API för Gremlin-konton. I den här artikeln beskrivs hur du använder .NET-massexekutorbiblioteket med API för NoSQL-konton. Information om hur du använder .NET-massexekutorbiblioteket med API för Gremlin-konton finns i Mata in data i grupp i Azure Cosmos DB för Gremlin med hjälp av ett massexekutorbibliotek.

Förutsättningar

Klona exempelprogrammet

Nu ska vi växla till att arbeta med kod genom att ladda ned ett .NET-exempelprogram från GitHub. Det här programmet utför massåtgärder på data som lagras i ditt Azure Cosmos DB-konto. Om du vill klona programmet öppnar du en kommandotolk, navigerar till katalogen där du vill kopiera det och kör följande kommando:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

Den klonade lagringsplatsen innehåller två exempel, BulkImportSample och BulkUpdateSample. Du kan öppna något av exempelprogrammen, uppdatera anslutningssträng i App.config-filen med ditt Azure Cosmos DB-kontos anslutningssträng, skapa lösningen och köra den.

BulkImportSample-programmet genererar slumpmässiga dokument och massimporter dem till ditt Azure Cosmos DB-konto. BulkUpdateSample-programmet uppdaterar de importerade dokumenten genom att ange korrigeringar som åtgärder som ska utföras på specifika dokumentfält. I nästa avsnitt granskar du koden i var och en av dessa exempelappar.

Massimportera data till ett Azure Cosmos DB-konto

  1. Gå till mappen BulkImportSample och öppna filen BulkImportSample.sln .

  2. Azure Cosmos DB:s anslutningssträng hämtas från App.config-filen enligt följande kod:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    Massimportören skapar en ny databas och en container med databasnamnet, containernamnet och dataflödesvärdena som anges i App.config-filen.

  3. Sedan initieras DocumentClient-objektet med Direkt TCP-anslutningsläge:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. BulkExecutor-objektet initieras med ett högt återförsöksvärde för väntetid och begränsade begäranden. Sedan är de inställda på 0 för att överföra överbelastningskontroll till BulkExecutor under dess livslängd.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. Programmet anropar BulkImportAsync-API:et. .NET-biblioteket innehåller två överlagringar av API:et för massimport – ett som accepterar en lista över serialiserade JSON-dokument och det andra som accepterar en lista över deserialiserade POCO-dokument. Mer information om definitionerna för var och en av dessa överlagrade metoder finns i API-dokumentationen.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    BulkImportAsync-metoden accepterar följande parametrar:

    Parameter Beskrivning
    enableUpsert En flagga för att aktivera upsert-åtgärder i dokumenten. Om det redan finns ett dokument med det angivna ID:t uppdateras det. Som standard är den inställd på false.
    disableAutomaticIdGeneration En flagga för att inaktivera automatisk generering av ID. Som standard är det inställt på sant.
    maxConcurrencyPerPartitionKeyRange Den maximala graden av samtidighet per partitionsnyckelintervall. Om du anger null används standardvärdet 20 i biblioteket.
    maxInMemorySortingBatchSize Det maximala antalet dokument som hämtas från dokumentuppräknaren, som skickas till API-anropet i varje steg. För den minnesinterna sorteringsfasen som sker innan massimporten. Om du anger den här parametern till null används standardvärdet för minimum (documents.count, 1000000).
    cancellationToken Annulleringstoken för att avsluta massimportåtgärden på ett korrekt sätt.

Definition av massimportsvarsobjekt
Resultatet av API-anropet för massimport innehåller följande attribut:

Parameter Beskrivning
NumberOfDocumentsImporterad (lång) Det totala antalet dokument som har importerats av det totala antalet dokument som skickats till API-anropet för massimport.
TotalRequestUnitsConsumed (dubbelt) Totalt antal enheter för begäran (RU) som förbrukas av API-anropet för massimport.
TotalTimeTaken (TimeSpan) Den totala tid det tar för API-anropet för massimport att slutföra körningen.
BadInputDocuments (listobjekt><) Listan över dokument i felaktigt format som inte har importerats i API-anropet för massimport. Åtgärda de dokument som returneras och försök importera igen. Ogiltigt formaterade dokument innehåller dokument vars ID-värde inte är en sträng (null eller någon annan datatyp anses vara ogiltig).

Massuppdatering av data i ditt Azure Cosmos DB-konto

Du kan uppdatera befintliga dokument med hjälp av BulkUpdateAsync-API:et. I det här exemplet anger Name du fältet till ett nytt värde och tar bort fältet Description från de befintliga dokumenten. Fullständig uppsättning uppdateringsåtgärder som stöds finns i API-dokumentationen.

  1. Gå till mappen BulkUpdateSample och öppna filen BulkUpdateSample.sln .

  2. Definiera uppdateringsobjekten tillsammans med motsvarande fältuppdateringsåtgärder. I det här exemplet använder du SetUpdateOperation för att uppdatera Name fältet och UnsetUpdateOperation för att ta bort fältet Description från alla dokument. Du kan också utföra andra åtgärder som att öka ett dokumentfält med ett specifikt värde, push-överföra specifika värden till ett matrisfält eller ta bort ett specifikt värde från ett matrisfält. Mer information om olika metoder som tillhandahålls av massuppdaterings-API:et finns i API-dokumentationen.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. Programmet anropar BulkUpdateAsync-API:et. Mer information om definitionen av metoden BulkUpdateAsync finns i API-dokumentationen.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    BulkUpdateAsync-metoden accepterar följande parametrar:

    Parameter Beskrivning
    maxConcurrencyPerPartitionKeyRange Den maximala graden av samtidighet per partitionsnyckelintervall. Om den här parametern anges till null använder biblioteket standardvärdet(20).
    maxInMemorySortingBatchSize Det maximala antalet uppdateringsobjekt som hämtas från uppräknaren för uppdateringsobjekt som skickas till API-anropet i varje steg. För den minnesinterna sorteringsfasen som sker innan massuppdateringen. Om den här parametern ställs in på null används standardvärdet för biblioteket (updateItems.count, 1000000).
    cancellationToken Annulleringstoken för att avsluta massuppdateringsåtgärden på ett korrekt sätt.

Definition av massuppdateringssvarsobjekt
Resultatet av API-anropet för massuppdatering innehåller följande attribut:

Parameter Beskrivning
NumberOfDocumentsUpdated (lång) Antalet dokument som har uppdaterats av det totala antalet dokument som har skickats till API-anropet för massuppdatering.
TotalRequestUnitsConsumed (dubbelt) Det totala antalet enheter för programbegäran (RU:er) som förbrukas av API-anropet för massuppdatering.
TotalTimeTaken (TimeSpan) Den totala tid det tar för API-anropet för massuppdatering att slutföra körningen.

Prestandatips

Överväg följande punkter för bättre prestanda när du använder massexekutorbiblioteket:

  • För bästa prestanda kör du ditt program från en virtuell Azure-dator som finns i samma region som ditt Azure Cosmos DB-kontos skrivregion.

  • Vi rekommenderar att du instansierar ett enda BulkExecutor-objekt för hela programmet på en enda virtuell dator som motsvarar en specifik Azure Cosmos DB-container.

  • En enda massåtgärds-API-körning förbrukar en stor del av klientdatorns CPU och nätverks-I/O när flera uppgifter skapas internt. Undvik att skapa flera samtidiga uppgifter i din programprocess som kör API-anrop för massåtgärder. Om ett API-anrop med en enda massåtgärd som körs på en enda virtuell dator inte kan förbruka hela containerns dataflöde (om containerns dataflöde > är 1 miljon RU/s) föredrar vi att skapa separata virtuella datorer för att köra API-anropen för massåtgärder samtidigt.

  • InitializeAsync() Kontrollera att metoden anropas efter instansiering av ett BulkExecutor-objekt för att hämta azure Cosmos DB-målcontainerns partitionskarta.

  • Kontrollera att gcServer är aktiverat för bättre prestanda i programmets App.Config

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • Biblioteket genererar spårningar som kan samlas in i en loggfil eller i -konsolen. Om du vill aktivera båda lägger du till följande kod i programmets App.Config-fil :

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

Nästa steg