Delen via


Bulkbewerkingen uitvoeren op Azure Cosmos DB-gegevens

VAN TOEPASSING OP: NoSQL

Deze zelfstudie bevat instructies voor het uitvoeren van bulkbewerkingen in de Azure Cosmos DB Java V4 SDK. Deze versie van de SDK wordt geleverd met de ingebouwde bulkexecutorbibliotheek. Als u een oudere versie van Java SDK gebruikt, is het raadzaam om te migreren naar de nieuwste versie. Azure Cosmos DB Java V4 SDK is de huidige aanbevolen oplossing voor bulkondersteuning voor Java.

Momenteel wordt de bibliotheek voor bulkexecutor alleen ondersteund door Azure Cosmos DB for NoSQL en API voor Gremlin-accounts. Zie Bulkbewerkingen uitvoeren in Azure Cosmos DB voor Gremlin voor Gremlin voor meer informatie over het gebruik van de .NET-bibliotheek voor bulkexecutor met API voor Gremlin.

Vereisten

  • Als u geen Azure-abonnement hebt, maakt u een gratis account voordat u begint.

  • U kunt Azure Cosmos DB gratis proberen zonder een Azure-abonnement, gratis en toezeggingen. U kunt ook de Azure Cosmos DB Emulator gebruiken met het https://localhost:8081 eindpunt. De primaire sleutel wordt gegeven in Aanvragen verifiëren.

  • Java Development Kit (JDK) 1.8+

    • Voer op Ubuntu apt-get install default-jdk uit om de JDK te installeren.

    • Zorg dat de omgevingsvariabele JAVA_HOME verwijst naar de map waarin de JDK is geïnstalleerd.

  • Download en installeer een binair Maven-archief

    • Op Ubuntu kunt u apt-get install maven uitvoeren om Maven te installeren.
  • Maak een Azure Cosmos DB for NoSQL-account met behulp van de stappen die worden beschreven in de sectie Databaseaccount maken van het java-quickstart-artikel.

De voorbeeldtoepassing klonen

We gaan nu over naar het werken met code door een algemene opslagplaats met voorbeelden voor Java V4 SDK voor Azure Cosmos DB te downloaden vanuit GitHub. Deze voorbeeldtoepassingen voeren CRUD-bewerkingen en andere algemene bewerkingen uit in Azure Cosmos DB. Als u de opslagplaats wilt klonen, opent u een opdrachtprompt, gaat u naar de map waar u de toepassing wilt kopiëren en voert u de volgende opdracht uit:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

De gekloonde opslagplaats bevat een voorbeeld SampleBulkQuickStartAsync.java in de /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async map. De toepassing genereert documenten en voert bewerkingen uit om bulksgewijs items te maken, upsert, te vervangen en te verwijderen in Azure Cosmos DB. In de volgende secties bekijken we de code in de voorbeeld-app.

Bulkuitvoering in Azure Cosmos DB

  1. De verbindingsreeks s van Azure Cosmos DB worden gelezen als argumenten en toegewezen aan variabelen die zijn gedefinieerd in/examples/common/AccountSettings.javabestand. Deze omgevingsvariabelen moeten worden ingesteld
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Als u het bulkvoorbeeld wilt uitvoeren, geeft u de hoofdklasse op:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. Het CosmosAsyncClient object wordt geïnitialiseerd met behulp van de volgende instructies:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. In het voorbeeld worden een asynchrone database en container gemaakt. Vervolgens worden meerdere documenten gemaakt waarop bulkbewerkingen worden uitgevoerd. Deze documenten worden toegevoegd aan een Flux<Family> reactief streamobject:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. Het voorbeeld bevat methoden voor bulksgewijs maken, upsert, vervangen en verwijderen. In elke methode wijzen we de familiesdocumenten in de BulkWriter-stroom Flux<Family> toe aan meerdere methode-aanroepen.CosmosBulkOperations Deze bewerkingen worden toegevoegd aan een ander reactief streamobject Flux<CosmosItemOperation>. De stream wordt vervolgens doorgegeven aan de executeBulkOperations methode van de asynchrone container die we aan het begin hebben gemaakt en bewerkingen worden bulksgewijs uitgevoerd. Zie de onderstaande methode voor bulksgewijs maken als voorbeeld:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Er is ook een klasse BulkWriter.java in dezelfde map als de voorbeeldtoepassing. Deze klasse laat zien hoe u frequentielimieten (429) en time-outfouten (408) verwerkt die kunnen optreden tijdens bulkuitvoering en het effectief opnieuw proberen van deze bewerkingen. Het wordt geïmplementeerd in de onderstaande methoden en laat ook zien hoe u lokaal en globaal doorvoerbeheer implementeert.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Daarnaast zijn er bulksgewijs methoden voor het maken van het voorbeeld, die laten zien hoe u antwoordverwerking toevoegt en uitvoeringsopties instelt:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

Prestatietips

Houd rekening met de volgende punten voor betere prestaties bij het gebruik van bulkexecutorbibliotheek:

  • Voor de beste prestaties voert u uw toepassing uit vanaf een Azure-VM in dezelfde regio als uw Azure Cosmos DB-accountschrijfregio.

  • Voor een hogere doorvoer:

    • Stel de heapgrootte van de JVM in op een groot genoeg getal om geheugenproblemen te voorkomen bij het verwerken van een groot aantal documenten. Voorgestelde heapgrootte: max(3 GB, 3 * groottevan(alle documenten die zijn doorgegeven aan bulkimport-API in één batch)).
    • Er is een voorverwerkingstijd, waardoor u een hogere doorvoer krijgt bij het uitvoeren van bulkbewerkingen met een groot aantal documenten. Dus als u 10.000.000 documenten wilt importeren, is het uitvoeren van bulkimport 10 keer op 10 bulk documenten die elk van de grootte 100.000.000 documenten groot zijn dan het uitvoeren van bulkimport 100 keer op 100 bulk documenten elk van de grootte 100.000 documenten.
  • Het wordt aanbevolen om één CosmosAsyncClient-object te instantiëren voor de hele toepassing binnen één virtuele machine die overeenkomt met een specifieke Azure Cosmos DB-container.

  • Omdat de API-uitvoering van één bulkbewerking een groot deel van de CPU en netwerk-IO van de clientcomputer verbruikt. Dit gebeurt door meerdere taken intern uit te voeren, om te voorkomen dat meerdere gelijktijdige taken binnen uw toepassingsproces worden uitgevoerd, waarbij API-aanroepen voor bulkbewerkingen worden uitgevoerd. Als een API-aanroep voor één bulkbewerking die op één virtuele machine wordt uitgevoerd, de doorvoer van de hele container niet kan verbruiken (als de doorvoer > van uw container 1 miljoen RU/s is), is het raadzaam om afzonderlijke virtuele machines te maken om gelijktijdig API-aanroepen voor bulkbewerkingen uit te voeren.

Volgende stappen