Effectuer des opérations en bloc sur des données Azure Cosmos DB
S’APPLIQUE À : NoSQL
Ce tutoriel fournit des instructions sur l’exécution d’opérations en bloc dans le Kit de développement logiciel (SDK) Java V4 Azure Cosmos DB. Cette version du Kit de développement logiciel (SDK) est fournie avec la bibliothèque d’exécuteurs en bloc intégrée. Si vous utilisez une version antérieure du SDK Java, il est recommandé de migrer vers la version la plus récente. Le kit de développement logiciel (SDK) Java Azure Cosmos DB V4 est la solution actuellement recommandée pour la prise en charge en bloc Java.
Actuellement, la bibliothèque de l’exécuteur en bloc est prise en charge uniquement par les comptes Azure Cosmos DB for NoSQL et API pour Gremlin. Pour en savoir plus sur l’utilisation de la bibliothèque .NET de l’exécuteur en bloc avec l’API pour Gremlin, consultez Effectuer des opérations en bloc dans Azure Cosmos DB pour Gremlin.
Prérequis
Si vous n’avez pas d’abonnement Azure, créez un compte gratuit avant de commencer.
Vous pouvez essayer Azure Cosmos DB gratuitement sans abonnement Azure, libre de tous frais et engagements. Vous pouvez également utiliser l’émulateur Azure Cosmos DB avec le point de terminaison
https://localhost:8081
. La clé primaire est fournie dans des requêtes d’authentification.Java Development Kit (JDK) 1.8+
Sur Ubuntu, exécutez
apt-get install default-jdk
pour installer le JDK.Veillez à définir la variable d’environnement JAVA_HOME pour qu’elle pointe vers le dossier dans lequel le JDK est installé.
Téléchargement et installation d’une archive binaire Maven
- Sur Ubuntu, vous pouvez exécuter
apt-get install maven
pour installer Maven.
- Sur Ubuntu, vous pouvez exécuter
Créez un compte Azure Cosmos DB for NoSQL en suivant les étapes décrites dans la section Créer un compte de base de données de l’article de démarrage rapide Java.
Clonage de l’exemple d’application
Nous allons maintenant passer à l’utilisation de code en téléchargeant un référentiel d’exemples génériques pour le Kit de développement logiciel (SDK) Java V4 pour Azure Cosmos DB à partir de GitHub. Ces exemples d’applications effectuent des opérations CRUD et d’autres opérations courantes sur Azure Cosmos DB. Pour cloner le répertoire, ouvrez une invite de commandes, accédez au répertoire dans lequel souhaitez copier l’application, puis exécutez la commande suivante :
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples
Le référentiel cloné contient un exemple SampleBulkQuickStartAsync.java
dans le dossier /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async
. L’application génère des documents et exécute des opérations en bloc de création, d’upsert, de remplacement et de suppression d’éléments dans Azure Cosmos DB. Dans les sections suivantes, nous allons examiner le code de l’exemple d’application.
Exécution en bloc dans Azure Cosmos DB
- Les chaînes de connexion d’Azure Cosmos DB sont lues en tant qu’arguments, et affectées à des variables définies dans le fichier /
examples/common/AccountSettings.java
. Ces variables d’environnement doivent être définies
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key
Pour exécuter l’exemple d’opération en bloc, spécifiez sa Classe principale :
com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
L’objet
CosmosAsyncClient
est initialisé à l’aide des instructions suivantes :client = new CosmosClientBuilder() .endpoint(AccountSettings.HOST) .key(AccountSettings.MASTER_KEY) .preferredRegions(preferredRegions) .contentResponseOnWriteEnabled(true) .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
L’exemple crée une base de données et un conteneur asynchrones. Il crée ensuite plusieurs documents sur lesquels des opérations en bloc seront exécutées. Il ajoute ces documents à un objet flux réactif
Flux<Family>
:Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); Family johnsonFamilyItem = Families.getJohnsonFamilyItem(); Family smithFamilyItem = Families.getSmithFamilyItem(); // Setup family items to create Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
L’exemple contient des méthodes pour l’exécution en bloc d’opérations de création, d’upsert, de remplacement et de suppression. Dans chaque méthode, nous mappons les documents de familles dans le flux BulkWriter
Flux<Family>
à plusieurs appels de méthode dansCosmosBulkOperations
. Ces opérations sont ajoutées à un autre objet flux réactifFlux<CosmosItemOperation>
. Le flux est ensuite passé à la méthodeexecuteBulkOperations
ducontainer
asynchrone que nous avons créé au début, et les opérations sont exécutées en bloc. La méthode de création en bloc ci-dessous présente un exemple :private void bulkCreateItems(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).blockLast(); }
Il existe également une classe
BulkWriter.java
dans le même répertoire que l’exemple d’application. Cette classe montre comment gérer les erreurs de limitation de débit (429) et de délai d’expiration (408) qui peuvent se produire pendant une exécution en bloc, et réessayer efficacement d’effectuer ces opérations. Elle est implémentée dans les méthodes ci-dessous, montrant également comment implémenter le contrôle du débit local et global.private void bulkUpsertItemsWithBulkWriterAbstraction() { Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() { ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group1") .setTargetThroughput(200) .build(); container.enableLocalThroughputControlGroup(groupConfig); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() { String controlContainerId = "throughputControlContainer"; CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId); database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block(); ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group-" + UUID.randomUUID()) .setTargetThroughput(200) .build(); GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId) .setControlItemRenewInterval(Duration.ofSeconds(5)) .setControlItemExpireInterval(Duration.ofSeconds(20)) .build(); container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig); CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); requestOptions.setThroughputControlGroupName(groupConfig.getGroupName()); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); }
En outre, il existe des méthodes de création en bloc dans l’exemple, qui illustrent la manière d’ajouter un traitement de réponse et de définir des options d’exécution :
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> { CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation(); if (cosmosBulkOperationResponse.getException() != null) { logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); } else if (cosmosBulkItemResponse == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) { logger.error( "The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " + "successfully with " + "a" + " {} response code.", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName(), cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a"); } else { logger.info( "Item ID: [{}] Item PartitionKey Value: [{}]", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName()); logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode()); logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge()); } if (cosmosBulkItemResponse == null) { return Mono.error(new IllegalStateException("No response retrieved.")); } else { return Mono.just(cosmosBulkItemResponse); } }).blockLast(); } private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) { CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); // The default value for maxMicroBatchConcurrency is 1. // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage. // // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not. // When the RU has already been under saturation, increasing the concurrency will not help the situation, // rather it may cause more 429 and request timeout. bulkExecutionOptions.setMaxMicroBatchConcurrency(2); Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast(); }
Conseils sur les performances
Pour bénéficier de meilleures performances lors de l’utilisation de la bibliothèque de l’exécuteur en bloc, considérez les points suivants :
Pour de meilleures performances, exécutez votre application à partir d’une machine virtuelle Azure qui se trouve dans la région d’écriture du compte Azure Cosmos DB.
Pour atteindre un débit plus élevé :
- Affectez à la taille du tas de la machine virtuelle Java une valeur suffisamment élevée pour éviter tout problème de mémoire lors du traitement d’un grand nombre de documents. Suggestion de taille de tas : max(3 GB, 3 * sizeof(tous les documents transmis à l’API d’importation en bloc dans un lot)).
- Il y a un temps de prétraitement, grâce auquel vous obtiendrez un débit supérieur lors de l’exécution d’opérations en bloc avec un grand nombre de documents. Si vous souhaitez importer 10 000 000 documents, il est préférable d’exécuter une importation en bloc 10 fois sur 10 lots de documents en contenant chacun 1 000 000, plutôt que d’exécuter une importation en bloc 100 fois sur 100 lots de documents en contenant chacun 100 000.
Nous vous recommandons d’instancier un objet CosmosAsyncClient unique pour l’ensemble de l’application au sein d’une seule machine virtuelle, qui correspond à un conteneur Azure Cosmos DB spécifique.
L’exécution d’une API d’opération en bloc consomme une grande partie des E/S réseau et du processeur de l’ordinateur client. Cela est dû à la génération automatique de plusieurs tâches en interne. Évitez de générer plusieurs tâches simultanées dans votre processus d’application, exécutant chacune des appels d’API d’opérations en bloc. Si un appel d’API d’opération en bloc en cours d’exécution sur une seule machine virtuelle ne peut pas consommer le débit complet de votre conteneur (si le débit de votre conteneur est supérieur à 1 million RU/s), il est préférable de créer des machines virtuelles distinctes pour exécuter simultanément les appels d’API d’opérations en bloc.
Étapes suivantes
- Pour obtenir une vue d’ensemble de la fonctionnalité d’exécuteur en bloc, consultez la vue d’ensemble de l’exécuteur en bloc.