Eseguire operazioni bulk sui dati di Azure Cosmos DB

SI APPLICA A: NoSQL

Questa esercitazione fornisce istruzioni sull'esecuzione di operazioni bulk in Azure Cosmos DB Java V4 SDK. Questa versione dell'SDK viene fornita con la libreria di esecuzione bulk predefinita. Se si usa una versione precedente di Java SDK, è consigliabile eseguire la migrazione alla versione più recente. Azure Cosmos DB Java V4 SDK è la soluzione consigliata corrente per il supporto bulk java.

Attualmente, la libreria di esecuzione bulk è supportata solo dagli account Azure Cosmos DB for NoSQL e API for Gremlin. Per informazioni sull'uso della libreria .NET di esecuzione bulk con l'API for Gremlin, vedere Eseguire operazioni bulk in Azure Cosmos DB for Gremlin.

Prerequisiti

Clonare l'applicazione di esempio

Passare ora all'uso del codice scaricando un repository di esempi generici per Java V4 SDK per Azure Cosmos DB da GitHub. Queste applicazioni di esempio eseguono operazioni CRUD e altre operazioni comuni in Azure Cosmos DB. Per clonare il repository, aprire un prompt dei comandi, passare alla directory in cui si vuole copiare l'applicazione ed eseguire il comando seguente:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

Il repository clonato contiene un'applicazione SampleBulkQuickStartAsync.java di esempio nella cartella /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async. L'applicazione genera documenti ed esegue operazioni di creazione, upsert, sostituzione ed eliminazione bulk di elementi in Azure Cosmos DB. Nelle sezioni successive verrà esaminato il codice nell'app di esempio.

Esecuzione bulk in Azure Cosmos DB

  1. Le stringhe di connessione di Azure Cosmos DB vengono lette come argomenti e assegnate alle variabili definite nel file /examples/common/AccountSettings.java. Queste variabili di ambiente devono essere impostate
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

Per eseguire l'esempio bulk, specificare la relativa classe Main:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. L'oggetto CosmosAsyncClient viene quindi inizializzato usando le istruzioni seguenti:

    client = new CosmosClientBuilder()
        .endpoint(AccountSettings.HOST)
        .key(AccountSettings.MASTER_KEY)
        .preferredRegions(preferredRegions)
        .contentResponseOnWriteEnabled(true)
        .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
    
    
  2. L'esempio crea un database e un contenitore asincroni. Crea quindi più documenti in cui verranno eseguite le operazioni bulk. Aggiunge questi documenti a un oggetto flusso reattivo Flux<Family>:

    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
    Family smithFamilyItem = Families.getSmithFamilyItem();
    
    //  Setup family items to create
    Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
    
  3. L'esempio contiene metodi per la creazione, l'upsert, la sostituzione e l'eliminazione bulk. In ogni metodo vengono mappati i documenti delle famiglie nel flusso Flux<Family> di BulkWriter a più chiamate a un metodo in CosmosBulkOperations. Queste operazioni vengono aggiunte a un altro oggetto flusso reattivo Flux<CosmosItemOperation>. Il flusso viene quindi passato al metodo executeBulkOperations dell'oggetto container asincrono creato all'inizio e le operazioni vengono eseguite in blocco. Per un esempio, vedere il metodo di creazione bulk seguente:

    private void bulkCreateItems(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).blockLast();
    }
    
  4. Nella stessa directory dell'applicazione di esempio esiste anche una classe BulkWriter.java. Questa classe illustra come gestire gli errori di limitazione delle richieste (429) e timeout (408) che possono verificarsi durante l'esecuzione bulk e come ripetere efficacemente tali operazioni. È implementata nei metodi seguenti, per mostrare anche come implementare il controllo della velocità effettiva locale e globale.

    private void bulkUpsertItemsWithBulkWriterAbstraction() {
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group1")
                        .setTargetThroughput(200)
                        .build();
        container.enableLocalThroughputControlGroup(groupConfig);
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
    private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
        String controlContainerId = "throughputControlContainer";
        CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
        database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
    
        ThroughputControlGroupConfig groupConfig =
                new ThroughputControlGroupConfigBuilder()
                        .setGroupName("group-" + UUID.randomUUID())
                        .setTargetThroughput(200)
                        .build();
    
        GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
                .setControlItemRenewInterval(Duration.ofSeconds(5))
                .setControlItemExpireInterval(Duration.ofSeconds(20))
                .build();
    
        container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
        CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
        requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
        Family andersenFamilyItem = Families.getAndersenFamilyItem();
        Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
        CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
        CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
        BulkWriter bulkWriter = new BulkWriter(container);
        bulkWriter.scheduleWrites(andersonItemOperation);
        bulkWriter.scheduleWrites(wakeFieldItemOperation);
        bulkWriter.execute().subscribe();
    }
    
  5. Nell'esempio sono inoltre disponibili metodi di creazione bulk, che illustrano come aggiungere l'elaborazione della risposta e impostare le opzioni di esecuzione:

    private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(
            family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
    
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
    
            if (cosmosBulkOperationResponse.getException() != null) {
                logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null ||
                !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
    
                logger.error(
                    "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                        "successfully with " + "a" + " {} response code.",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName(),
                    cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
            } else {
                logger.info(
                    "Item ID: [{}]  Item PartitionKey Value: [{}]",
                    cosmosItemOperation.<Family>getItem().getId(),
                    cosmosItemOperation.<Family>getItem().getLastName());
                logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
                logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
            }
            if (cosmosBulkItemResponse == null) {
                return Mono.error(new IllegalStateException("No response retrieved."));
            } else {
                return Mono.just(cosmosBulkItemResponse);
            }
        }).blockLast();
    }
    
    private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
        CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
    
        // The default value for maxMicroBatchConcurrency is 1.
        // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
        //
        // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
        // When the RU has already been under saturation, increasing the concurrency will not help the situation,
        // rather it may cause more 429 and request timeout.
        bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
        Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
        container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
    }
    

    Suggerimenti per le prestazioni

    Per ottenere prestazioni migliori, quando si usa la libreria dell'executor bulk tenere presente quanto segue:

    • Per prestazioni ottimali eseguire l'applicazione da una macchina virtuale di Azure nella stessa area in cui si trova l'area di scrittura dell'account Azure Cosmos DB.

    • Per ottenere una velocità effettiva più elevata:

      • Impostare le dimensioni dell'heap della JVM su un numero sufficientemente elevato per evitare problemi di memoria quando si gestisce un numero elevato di documenti. Dimensioni heap suggeriti: max (3 GB, 3 * dimensioni di (tutti i documenti passati all'API di importazione bulk in un unico batch)).
      • Il tempo di pre-elaborazione previsto consente di ottenere una velocità effettiva maggiore quando si eseguono operazioni bulk con un numero elevato di documenti. Per importare 10.000.000 documenti, è preferibile eseguire l'importazione in blocco 10 volte di 10 blocchi di documenti, ognuno con dimensione pari a 1.000.000, anziché importare in blocco 100 volte 100 blocchi di documenti, ognuno con dimensione pari a 100.000.
    • È consigliabile creare l'istanza di un singolo oggetto CosmosAsyncClient per l'intera applicazione all'interno di una singola macchina virtuale corrispondente a un contenitore di Azure Cosmos DB specifico.

    • Dato che l'esecuzione di una singola API con un'operazione in blocco usa un blocco di grandi dimensioni della CPU e dell'I/O di rete del computer client generando internamente più attività, evitare di generare più attività simultanee all'interno del processo dell'applicazione, di cui ognuna esegue chiamate API di operazioni in blocco. Se una singola chiamata API di un'operazione bulk in esecuzione in una singola macchina virtuale non è in grado di usare la velocità effettiva dell'intero contenitore (se la velocità effettiva del contenitore > 1 milione di UR/s), è preferibile creare macchine virtuali separate per eseguire simultaneamente le chiamate API di operazione bulk.

    Passaggi successivi