Delen via


De .NET-bibliotheek voor bulkexecutor gebruiken om bulkbewerkingen uit te voeren in Azure Cosmos DB

VAN TOEPASSING OP: NoSQL

Notitie

De bulkexecutorbibliotheek die in dit artikel wordt beschreven, wordt onderhouden voor toepassingen die gebruikmaken van de versie .NET SDK 2.x. Voor nieuwe toepassingen kunt u de bulkondersteuning gebruiken die rechtstreeks beschikbaar is met de .NET SDK versie 3.x en hiervoor is geen externe bibliotheek vereist.

Als u momenteel de bibliotheek voor bulkexecutor gebruikt en van plan bent om te migreren naar bulkondersteuning voor de nieuwere SDK, gebruikt u de stappen in de migratiehandleiding om uw toepassing te migreren.

Deze zelfstudie bevat instructies voor het gebruik van de .NET-bibliotheek voor bulkexecutor om documenten te importeren en bij te werken naar een Azure Cosmos DB-container. Zie het overzicht van de bulkexecutorbibliotheek van Azure Cosmos DB voor meer informatie over de bulkexecutorbibliotheek en hoe u hiermee enorme doorvoer en opslag kunt gebruiken. In deze zelfstudie ziet u een voorbeeld van een .NET-toepassing waarbij bulksgewijs gegenereerde documenten worden geïmporteerd in een Azure Cosmos DB-container. Nadat u de gegevens hebt geïmporteerd, ziet u in de bibliotheek hoe u de geïmporteerde gegevens bulksgewijs kunt bijwerken door patches op te geven als bewerkingen die moeten worden uitgevoerd op specifieke documentvelden.

Momenteel wordt de bulkexecutorbibliotheek alleen ondersteund door de Azure Cosmos DB for NoSQL- en API voor Gremlin-accounts. In dit artikel wordt beschreven hoe u de .NET-bibliotheek voor bulkexecutor gebruikt met API voor NoSQL-accounts. Zie Gegevens bulksgewijs opnemen in de Azure Cosmos DB voor Gremlin-bibliotheek met behulp van een bulkexecutorbibliotheek voor meer informatie over het gebruik van de .NET-bibliotheek voor bulkexecutor met API voor Gremlin-accounts.

Vereisten

De voorbeeldtoepassing klonen

Laten we nu overschakelen naar het werken met code door een .NET-voorbeeldtoepassing te downloaden vanuit GitHub. Deze toepassing voert bulkbewerkingen uit op de gegevens die zijn opgeslagen in uw Azure Cosmos DB-account. Als u de toepassing wilt klonen, opent u een opdrachtprompt, gaat u naar de map waar u deze wilt kopiëren en voert u de volgende opdracht uit:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

De gekloonde opslagplaats bevat twee voorbeelden: BulkImportSample en BulkUpdateSample. U kunt een van de voorbeeldtoepassingen openen, de verbindingsreeks s in app.config-bestand bijwerken met de verbindingsreeks s van uw Azure Cosmos DB-account, de oplossing bouwen en uitvoeren.

De toepassing BulkImportSample genereert willekeurige documenten en importeert ze bulksgewijs naar uw Azure Cosmos DB-account. De BulkUpdateSample-toepassing werkt de geïmporteerde documenten bulksgewijs bij door patches op te geven als bewerkingen die moeten worden uitgevoerd op specifieke documentvelden. In de volgende secties bekijkt u de code in elk van deze voorbeeld-apps.

Gegevens bulksgewijs importeren in een Azure Cosmos DB-account

  1. Navigeer naar de map BulkImportSample en open het bestand BulkImportSample.sln .

  2. De verbindingsreeks s van Azure Cosmos DB worden opgehaald uit het bestand App.config, zoals wordt weergegeven in de volgende code:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    De bulkimporteur maakt een nieuwe database en een container met de databasenaam, containernaam en de doorvoerwaarden die zijn opgegeven in het App.config-bestand.

  3. Vervolgens wordt het DocumentClient-object geïnitialiseerd met de directe TCP-verbindingsmodus:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. Het BulkExecutor-object wordt geïnitialiseerd met een hoge waarde voor opnieuw proberen voor wachttijden en vertraagde aanvragen. Vervolgens worden ze ingesteld op 0 om congestiecontrole gedurende zijn levensduur door te geven aan BulkExecutor .

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. De toepassing roept de BulkImportAsync-API aan. De .NET-bibliotheek biedt twee overbelastingen van de bulkimport-API, een die een lijst met geserialiseerde JSON-documenten accepteert en de andere die een lijst met gedeserialiseerde POCO-documenten accepteert. Raadpleeg de API-documentatie voor meer informatie over de definities van elk van deze overbelaste methoden.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    De methode BulkImportAsync accepteert de volgende parameters:

    Parameter Beschrijving
    enableUpsert Een vlag voor het inschakelen van upsert-bewerkingen op de documenten. Als er al een document met de opgegeven id bestaat, wordt het bijgewerkt. Deze is standaard ingesteld op false.
    disableAutomaticIdGeneration Een vlag om het automatisch genereren van id uit te schakelen. Deze is standaard ingesteld op true.
    maxConcurrencyPerPartitionKeyRange De maximale mate van gelijktijdigheid per partitiesleutelbereik. Als u de instelling op null instelt, wordt voor de bibliotheek een standaardwaarde van 20 gebruikt.
    maxInMemorySortingBatchSize Het maximum aantal documenten dat wordt opgehaald uit de enumerator van het document, dat in elke fase wordt doorgegeven aan de API-aanroep. Voor de in-memory sorteerfase die plaatsvindt voordat bulksgewijs wordt geïmporteerd. Als u deze parameter instelt op null, wordt de standaard minimumwaarde gebruikt (documents.count, 1000000).
    cancellationToken Het annuleringstoken om de bulkimportbewerking correct af te sluiten.

Definitie van antwoordobject bulksgewijs importeren
Het resultaat van de API-aanroep voor bulkimport bevat de volgende kenmerken:

Parameter Beschrijving
NumberOfDocumentsImported (lang) Het totale aantal documenten dat is geïmporteerd uit het totale aantal documenten dat is geleverd aan de API-aanroep voor bulkimport.
TotalRequestUnitsConsumed (dubbel) De totale aanvraageenheden (RU) die worden gebruikt door de API-aanroep voor bulksgewijs importeren.
TotalTimeTaken (TimeSpan) De totale tijd die nodig is voor de api-aanroep voor bulksgewijs importeren om de uitvoering te voltooien.
BadInputDocuments (lijstobject><) De lijst met documenten met onjuiste indeling die niet zijn geïmporteerd in de API-aanroep voor bulksgewijs importeren. Corrigeer de geretourneerde documenten en voer het importeren opnieuw uit. Documenten met ongeldige indeling bevatten documenten waarvan de id-waarde geen tekenreeks is (null of een ander gegevenstype wordt als ongeldig beschouwd).

Gegevens bulksgewijs bijwerken in uw Azure Cosmos DB-account

U kunt bestaande documenten bijwerken met behulp van de BulkUpdateAsync-API . In dit voorbeeld stelt u het Name veld in op een nieuwe waarde en verwijdert u het Description veld uit de bestaande documenten. Raadpleeg de API-documentatie voor de volledige set ondersteunde updatebewerkingen.

  1. Navigeer naar de map BulkUpdateSample en open het bestand BulkUpdateSample.sln .

  2. Definieer de update-items samen met de bijbehorende veldupdatebewerkingen. In dit voorbeeld gebruikt u SetUpdateOperation om het Name veld en UnsetUpdateOperation bij te werken om het Description veld uit alle documenten te verwijderen. U kunt ook andere bewerkingen uitvoeren, zoals het verhogen van een documentveld door een specifieke waarde, het pushen van specifieke waarden naar een matrixveld of het verwijderen van een specifieke waarde uit een matrixveld. Raadpleeg de API-documentatie voor meer informatie over verschillende methoden die door de API voor bulksgewijs worden bijgewerkt.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. De toepassing roept de BulkUpdateAsync-API aan. Raadpleeg de API-documentatie voor meer informatie over de definitie van de methode BulkUpdateAsync.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    De methode BulkUpdateAsync accepteert de volgende parameters:

    Parameter Beschrijving
    maxConcurrencyPerPartitionKeyRange De maximale mate van gelijktijdigheid per partitiesleutelbereik. Als u deze parameter instelt op null, wordt in de bibliotheek de standaardwaarde (20) gebruikt.
    maxInMemorySortingBatchSize Het maximum aantal update-items dat is opgehaald uit de enumerator voor update-items die in elke fase zijn doorgegeven aan de API-aanroep. Voor de in-memory sorteerfase die plaatsvindt voordat bulksgewijs wordt bijgewerkt. Als u deze parameter instelt op null, gebruikt de bibliotheek de standaard minimumwaarde (updateItems.count, 1000000).
    cancellationToken Het annuleringstoken om de bulksgewijs bijwerkbewerking af te sluiten.

Definitie van antwoordobject bulksgewijs bijwerken
Het resultaat van de API-aanroep voor bulksgewijs bijwerken bevat de volgende kenmerken:

Parameter Beschrijving
NumberOfDocumentsUpdated (lang) Het aantal documenten dat is bijgewerkt uit het totale aantal documenten dat is geleverd aan de API-aanroep voor bulkupdates.
TotalRequestUnitsConsumed (dubbel) De totale aanvraageenheden (RU's) die worden gebruikt door de API-aanroep voor bulkupdates.
TotalTimeTaken (TimeSpan) De totale tijd die nodig is voor de API-aanroep voor bulksgewijs bijwerken om de uitvoering te voltooien.

Prestatietips

Houd rekening met de volgende punten voor betere prestaties wanneer u de bulkexecutorbibliotheek gebruikt:

  • Voor de beste prestaties voert u uw toepassing uit vanaf een virtuele Azure-machine die zich in dezelfde regio bevindt als de schrijfregio van uw Azure Cosmos DB-account.

  • Het is raadzaam om één BulkExecutor-object te instantiëren voor de hele toepassing binnen één virtuele machine die overeenkomt met een specifieke Azure Cosmos DB-container.

  • Een API-uitvoering voor één bulkbewerking verbruikt een groot deel van de CPU en netwerk-IO van de clientcomputer bij het intern instellen van meerdere taken. Vermijd het instellen van meerdere gelijktijdige taken binnen uw toepassingsproces waarmee API-aanroepen voor bulkbewerkingen worden uitgevoerd. Als een API-aanroep voor één bulkbewerking die op één virtuele machine wordt uitgevoerd, de doorvoer van de hele container niet kan verbruiken (als de doorvoer > van uw container 1 miljoen RU/s is), is het de voorkeur om afzonderlijke virtuele machines te maken om tegelijkertijd de API-aanroepen voor bulkbewerkingen uit te voeren.

  • Zorg ervoor dat de InitializeAsync() methode wordt aangeroepen nadat u een BulkExecutor-object hebt geïnstueerd om de partitietoewijzing van de Azure Cosmos DB-doelcontainer op te halen.

  • Zorg ervoor dat gcServer is ingeschakeld in de App.Config van uw toepassing voor betere prestaties

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • De bibliotheek verzendt traceringen die kunnen worden verzameld in een logboekbestand of in de console. Als u beide wilt inschakelen, voegt u de volgende code toe aan het App.Config-bestand van uw toepassing:

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

Volgende stappen