Dela via


Utföra massåtgärder på Azure Cosmos DB-data

GÄLLER FÖR: NoSQL

Den här självstudien innehåller instruktioner om hur du utför massåtgärder i Azure Cosmos DB Java V4 SDK. Den här versionen av SDK levereras med massexekutorbiblioteket inbyggt. Om du använder en äldre version av Java SDK rekommenderar vi att du migrerar till den senaste versionen. Azure Cosmos DB Java V4 SDK är den aktuella rekommenderade lösningen för Masssupport för Java.

För närvarande stöds massexekutorbiblioteket endast av Azure Cosmos DB för NoSQL och API för Gremlin-konton. Mer information om hur du använder .NET-massexekutorbibliotek med API för Gremlin finns i utföra massåtgärder i Azure Cosmos DB för Gremlin.

Förutsättningar

  • Om du inte har någon Azure-prenumeration skapar du ett kostnadsfritt konto innan du börjar.

  • Du kan prova Azure Cosmos DB kostnadsfritt utan en Azure-prenumeration, kostnadsfritt och åtaganden. Du kan också använda Azure Cosmos DB-emulatorn med https://localhost:8081 slutpunkten. Primärnyckeln finns i Autentisera begäranden.

  • Java Development Kit (JDK) 1.8+

    • I Ubuntu kör du apt-get install default-jdk för att installera JDK-paketet.

    • Tänk på att ställa in miljövariabeln JAVA_HOME så att den pekar på den mapp där JDK-paketet är installerat.

  • Ladda ned och installera ett Maven-binärarkiv

    • I Ubuntu kan du köra apt-get install maven för att installera Maven.
  • Skapa ett Azure Cosmos DB för NoSQL-konto med hjälp av stegen som beskrivs i avsnittet Skapa databaskonto i java-snabbstartsartikeln.

Klona exempelprogrammet

Nu ska vi växla till att arbeta med kod genom att ladda ned en generisk exempellagringsplats för Java V4 SDK för Azure Cosmos DB från GitHub. Dessa exempelprogram utför CRUD-åtgärder och andra vanliga åtgärder i Azure Cosmos DB. Om du vill klona lagringsplatsen öppnar du en kommandotolk, navigerar till katalogen där du vill kopiera programmet och kör följande kommando:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

Den klonade lagringsplatsen innehåller ett exempel SampleBulkQuickStartAsync.java i /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async mappen. Programmet genererar dokument och kör åtgärder för att massskapa, upsert, ersätta och ta bort objekt i Azure Cosmos DB. I nästa avsnitt granskar vi koden i exempelappen.

Masskörning i Azure Cosmos DB

  1. Azure Cosmos DB:s niska veze läse som argument och tilldelas till variabler som definierats i /examples/common/AccountSettings.java-filen. Dessa miljövariabler måste anges
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Om du vill köra massexemplet anger du huvudklassen:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. Objektet CosmosAsyncClient initieras med hjälp av följande instruktioner:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. Exemplet skapar en asynkron databas och container. Sedan skapas flera dokument där massåtgärder ska köras. De här dokumenten läggs till i ett Flux<Family> reaktivt strömobjekt:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. Exemplet innehåller metoder för massskapande, upsert, replace och delete. I varje metod mappar vi familjedokumenten i BulkWriter-strömmen Flux<Family> till flera metodanrop i CosmosBulkOperations. Dessa åtgärder läggs till i ett annat reaktivt strömobjekt Flux<CosmosItemOperation>. Dataströmmen skickas sedan till metoden för executeBulkOperations den asynkronisering container som vi skapade i början och åtgärderna körs i bulk. Se massskapandemetoden nedan som ett exempel:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Det finns också en klass BulkWriter.java i samma katalog som exempelprogrammet. Den här klassen visar hur du hanterar hastighetsbegränsningsfel (429) och timeoutfel (408) som kan inträffa under masskörning och försöker utföra dessa åtgärder på ett effektivt sätt. Den implementeras i metoderna nedan och visar även hur du implementerar lokal och global dataflödeskontroll.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Dessutom finns det massskapandemetoder i exemplet som visar hur du lägger till svarsbearbetning och anger körningsalternativ:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

Prestandatips

Överväg följande punkter för bättre prestanda när du använder massexekutorbiblioteket:

  • För bästa prestanda kör du ditt program från en virtuell Azure-dator i samma region som skrivregionen för ditt Azure Cosmos DB-konto.

  • För att uppnå högre dataflöde:

    • Ange JVM:s heapstorlek till ett tillräckligt stort antal för att undvika minnesproblem vid hantering av ett stort antal dokument. Föreslagen heapstorlek: max(3 GB, 3 * sizeof(alla dokument som skickas till massimport-API:et i en batch)).
    • Det finns en förbearbetningstid på grund av vilket du får högre dataflöde när du utför massåtgärder med ett stort antal dokument. Om du vill importera 10 000 000 dokument är det därför bättre att köra massimport 10 gånger på 10 massdokument var och en av storlek 1 000 000 än att köra massimport 100 gånger på 100 massdokument med varje storlek på 100 000 dokument.
  • Vi rekommenderar att du instansierar ett enda CosmosAsyncClient-objekt för hela programmet på en enda virtuell dator som motsvarar en specifik Azure Cosmos DB-container.

  • Eftersom en enda massåtgärds-API-körning förbrukar en stor del av klientdatorns CPU och nätverks-I/O. Detta inträffar genom att skapa flera uppgifter internt, undvik att skapa flera samtidiga uppgifter i programprocessen där varje körning av massåtgärds-API-anrop körs. Om ett ENDA MASSåtgärds-API-anrop som körs på en enda virtuell dator inte kan förbruka hela containerns dataflöde (om containerns dataflöde > är 1 miljon RU/s) är det bättre att skapa separata virtuella datorer för att samtidigt köra API-anrop för massåtgärder.

Nästa steg

  • En översikt över massexekutorfunktioner finns i översikt över massexekutorer.