Provádění hromadných operací s daty Azure Cosmos DB
PLATÍ PRO: NoSQL
Tento kurz obsahuje pokyny k provádění hromadných operací v sadě Azure Cosmos DB Java SDK verze 4. Tato verze sady SDK se dodává s integrovanou knihovnou bulk executoru. Pokud používáte starší verzi sady Java SDK, doporučujeme migrovat na nejnovější verzi. Azure Cosmos DB Java V4 SDK je aktuálně doporučené řešení pro hromadnou podporu Javy.
Knihovna bulk executoru je v současné době podporována pouze účty Azure Cosmos DB for NoSQL a API pro účty Gremlin. Další informace o použití knihovny .NET bulk Executor s rozhraním API pro Gremlin najdete v tématu Hromadné operace ve službě Azure Cosmos DB pro Gremlin.
Požadavky
Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.
Službu Azure Cosmos DB si můžete vyzkoušet zdarma bez předplatného Azure, bez poplatků a závazků. Nebo můžete použít emulátor služby Azure Cosmos DB s
https://localhost:8081
koncovým bodem. Primární klíč je uvedený v části Ověřování požadavků.Java Development Kit (JDK) 1.8+
Na Ubuntu nainstalujte sadu JDK spuštěním příkazu
apt-get install default-jdk
.Nezapomeňte nastavit proměnnou prostředí JAVA_HOME tak, aby odkazovala na složku, ve které je sada JDK nainstalovaná.
Stáhněte a nainstalujte binární archiv Maven.
- Na Ubuntu můžete Maven nainstalovat spuštěním příkazu
apt-get install maven
.
- Na Ubuntu můžete Maven nainstalovat spuštěním příkazu
Pomocí postupu popsaného v části Vytvoření účtu databáze v článku Rychlý start pro Javu vytvořte účet Služby Azure Cosmos DB for NoSQL.
Klonování ukázkové aplikace
Teď přejdeme na práci s kódem stažením obecného úložiště ukázek pro sadu Java SDK v4 pro Azure Cosmos DB z GitHubu. Tyto ukázkové aplikace provádějí operace CRUD a další běžné operace ve službě Azure Cosmos DB. Pokud chcete naklonovat úložiště, otevřete příkazový řádek, přejděte do adresáře, do kterého chcete aplikaci zkopírovat, a spusťte následující příkaz:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples
Klonované úložiště obsahuje ve složce ukázku SampleBulkQuickStartAsync.java
/azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async
. Aplikace generuje dokumenty a provádí operace hromadného vytváření, upsertu, nahrazení a odstraňování položek ve službě Azure Cosmos DB. V dalších částech si projdeme kód v ukázkové aplikaci.
Hromadné spouštění ve službě Azure Cosmos DB
- Připojovací řetězec služby Azure Cosmos DB se čtou jako argumenty a přiřazují se proměnným definovaným v souboru nebo
examples/common/AccountSettings.java
souboru. Tyto proměnné prostředí musí být nastaveny.
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key
Pokud chcete hromadnou ukázku spustit, zadejte její hlavní třídu:
com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
Objekt
CosmosAsyncClient
se inicializuje pomocí následujících příkazů:client = new CosmosClientBuilder() .endpoint(AccountSettings.HOST) .key(AccountSettings.MASTER_KEY) .preferredRegions(preferredRegions) .contentResponseOnWriteEnabled(true) .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
Ukázka vytvoří asynchronní databázi a kontejner. Potom vytvoří více dokumentů, na kterých se budou provádět hromadné operace. Tyto dokumenty se přidají do reaktivního objektu streamu
Flux<Family>
:Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); Family johnsonFamilyItem = Families.getJohnsonFamilyItem(); Family smithFamilyItem = Families.getSmithFamilyItem(); // Setup family items to create Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
Ukázka obsahuje metody hromadného vytváření, upsertu, nahrazení a odstranění. V každé metodě mapujeme řady dokumentů v bulkWriter
Flux<Family>
streamu na více volání metod vCosmosBulkOperations
. Tyto operace se přidají do jiného reaktivního objektu streamuFlux<CosmosItemOperation>
. Stream se pak předáexecuteBulkOperations
metodě asynchronní synchronizacecontainer
, kterou jsme vytvořili na začátku, a operace se provádějí hromadně. Jako příklad si prohlédněte metodu hromadného vytvoření:private void bulkCreateItems(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).blockLast(); }
Třída je také
BulkWriter.java
ve stejném adresáři jako ukázková aplikace. Tato třída ukazuje, jak zpracovat chyby omezování rychlosti (429) a časového limitu (408), ke kterým může dojít během hromadného spuštění, a efektivně tyto operace opakovat. Implementuje se v následujících metodách a také ukazuje, jak implementovat řízení místní a globální propustnosti.private void bulkUpsertItemsWithBulkWriterAbstraction() { Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() { ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group1") .setTargetThroughput(200) .build(); container.enableLocalThroughputControlGroup(groupConfig); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() { String controlContainerId = "throughputControlContainer"; CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId); database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block(); ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group-" + UUID.randomUUID()) .setTargetThroughput(200) .build(); GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId) .setControlItemRenewInterval(Duration.ofSeconds(5)) .setControlItemExpireInterval(Duration.ofSeconds(20)) .build(); container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig); CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); requestOptions.setThroughputControlGroupName(groupConfig.getGroupName()); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); }
Kromě toho v ukázce existují metody hromadného vytváření, které ilustrují, jak přidat zpracování odpovědí a nastavit možnosti spuštění:
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> { CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation(); if (cosmosBulkOperationResponse.getException() != null) { logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); } else if (cosmosBulkItemResponse == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) { logger.error( "The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " + "successfully with " + "a" + " {} response code.", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName(), cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a"); } else { logger.info( "Item ID: [{}] Item PartitionKey Value: [{}]", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName()); logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode()); logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge()); } if (cosmosBulkItemResponse == null) { return Mono.error(new IllegalStateException("No response retrieved.")); } else { return Mono.just(cosmosBulkItemResponse); } }).blockLast(); } private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) { CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); // The default value for maxMicroBatchConcurrency is 1. // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage. // // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not. // When the RU has already been under saturation, increasing the concurrency will not help the situation, // rather it may cause more 429 and request timeout. bulkExecutionOptions.setMaxMicroBatchConcurrency(2); Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast(); }
Tipy týkající se výkonu
Při použití knihovny Bulk Executor zvažte následující body, které vám posudí lepší výkon:
Pokud chcete dosáhnout nejlepšího výkonu, spusťte aplikaci z virtuálního počítače Azure ve stejné oblasti jako oblast zápisu účtu Azure Cosmos DB.
Pro dosažení vyšší propustnosti:
- Nastavte velikost haldy JVM na dostatečně velké číslo, abyste se vyhnuli problémům s pamětí při zpracování velkého počtu dokumentů. Navrhovaná velikost haldy: max(3 GB, 3 × sizeof(všechny dokumenty předané rozhraní API hromadného importu v jedné dávce)).
- Existuje doba předběžného zpracování, kvůli které získáte vyšší propustnost při provádění hromadných operací s velkým počtem dokumentů. Takže pokud chcete importovat 10 000 000 dokumentů, spuštění hromadného importu 10krát u 10 velkého množství dokumentů každé z velikostí 1 000 000 je vhodnější než spuštění hromadného importu 100krát na 100 hromadných dokumentech každé z velikostí 100 000 dokumentů.
Doporučujeme vytvořit instanci jednoho objektu CosmosAsyncClient pro celou aplikaci v rámci jednoho virtuálního počítače, který odpovídá určitému kontejneru Azure Cosmos DB.
Vzhledem k tomu, že spouštění rozhraní API pro hromadnou operaci spotřebovává velký blok procesoru a vstupně-výstupních operací klientského počítače. K tomu dochází tak, že interně vytváříte více úloh, vyhněte se vytváření více souběžných úloh v rámci procesu aplikace při každém provádění volání rozhraní API hromadné operace. Pokud jedno volání rozhraní API hromadné operace spuštěné na jednom virtuálním počítači nemůže využívat propustnost celého kontejneru (pokud propustnost > kontejneru 1 milion RU/s), je vhodnější vytvořit samostatné virtuální počítače pro souběžné spouštění volání rozhraní API hromadné operace.