Tömeges műveletek végrehajtása az Azure Cosmos DB-adatokon
A KÖVETKEZŐRE VONATKOZIK: NoSQL
Ez az oktatóanyag útmutatást nyújt a tömeges műveletek azure Cosmos DB Java V4 SDK-ban való végrehajtásához. Az SDK ezen verziója beépített tömeges végrehajtói kódtárat tartalmaz. Ha a Java SDK régebbi verzióját használja, javasoljuk, hogy a legújabb verzióra migráljon. Az Azure Cosmos DB Java V4 SDK a Java tömeges támogatásának jelenleg ajánlott megoldása.
A tömeges végrehajtói kódtárat jelenleg csak az Azure Cosmos DB for NoSQL és a Gremlin-fiókokhoz készült API támogatja. A tömeges végrehajtó .NET-kódtár gremlin API-val való használatáról a Gremlinhez készült Azure Cosmos DB tömeges műveleteinek végrehajtásával kapcsolatban olvashat.
Előfeltételek
Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.
Ingyenesen kipróbálhatja az Azure Cosmos DB-t Azure-előfizetés nélkül, ingyenesen és kötelezettségvállalásokkal. Vagy használhatja az Azure Cosmos DB Emulatort a
https://localhost:8081
végponttal. Az elsődleges kulcs a Kérelmek hitelesítése című részben található.Java fejlesztői készlet (JDK) 1.8+
Ubuntu rendszeren futtassa az
apt-get install default-jdk
parancsot a JDK telepítéséhez.Ügyeljen arra, hogy a JAVA_HOME környezeti változó arra a mappára mutasson, ahová a JDK telepítve lett.
Maven bináris archívum letöltése és telepítése
- Ubuntu rendszeren futtathatja az
apt-get install maven
parancsot a Maven telepítéséhez.
- Ubuntu rendszeren futtathatja az
Hozzon létre egy Azure Cosmos DB for NoSQL-fiókot a Java rövid útmutatójának adatbázisfiók létrehozása szakaszában leírt lépések végrehajtásával.
A mintaalkalmazás klónozása
Most váltsunk a kód használatára egy általános mintaadattár letöltésével az Azure Cosmos DB-hez készült Java V4 SDK-hoz a GitHubról. Ezek a mintaalkalmazások CRUD-műveleteket és más gyakori műveleteket hajtanak végre az Azure Cosmos DB-ben. Az adattár klónozásához nyisson meg egy parancssort, keresse meg azt a könyvtárat, ahová másolni szeretné az alkalmazást, és futtassa a következő parancsot:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples
A klónozott adattár egy mintát SampleBulkQuickStartAsync.java
tartalmaz a /azure-cosmos-java-sql-api-samples/tree/main/src/main/java/com/azure/cosmos/examples/bulk/async
mappában. Az alkalmazás dokumentumokat hoz létre, és műveleteket hajt végre az Azure Cosmos DB-ben lévő elemek tömeges létrehozásához, frissítéséhez, cseréjéhez és törléséhez. A következő szakaszokban áttekintjük a mintaalkalmazás kódját.
Tömeges végrehajtás az Azure Cosmos DB-ben
- Az Azure Cosmos DB kapcsolati sztring argumentumként olvashatók, és a fájlban
examples/common/AccountSettings.java
meghatározott változókhoz vannak rendelve. Ezeket a környezeti változókat be kell állítani
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key
A tömeges minta futtatásához adja meg annak fő osztályát:
com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
Az
CosmosAsyncClient
objektum inicializálása a következő utasítások használatával történik:client = new CosmosClientBuilder() .endpoint(AccountSettings.HOST) .key(AccountSettings.MASTER_KEY) .preferredRegions(preferredRegions) .contentResponseOnWriteEnabled(true) .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
A minta létrehoz egy aszinkron adatbázist és tárolót. Ezután több dokumentumot hoz létre, amelyeken tömeges műveleteket hajt végre. Ezeket a dokumentumokat hozzáadja egy
Flux<Family>
reaktív streamobjektumhoz:Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); Family johnsonFamilyItem = Families.getJohnsonFamilyItem(); Family smithFamilyItem = Families.getSmithFamilyItem(); // Setup family items to create Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
A minta metódusokat tartalmaz a tömeges létrehozáshoz, a frissítéshez, a csere és a törléshez. Minden metódusban a BulkWriter
Flux<Family>
streamben lévő családdokumentumokat több metódushívásra képezzük leCosmosBulkOperations
. Ezek a műveletek egy másik reaktív streamobjektumhozFlux<CosmosItemOperation>
lesznek hozzáadva. Ezt követően a rendszer átadja a streamet azexecuteBulkOperations
elején létrehozott aszinkroncontainer
metódusnak, és a műveletek tömegesen lesznek végrehajtva. Példaként lásd az alábbi tömeges létrehozási módszert:private void bulkCreateItems(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).blockLast(); }
Egy osztály
BulkWriter.java
is ugyanabban a könyvtárban található, mint a mintaalkalmazás. Ez az osztály bemutatja, hogyan kezelhetők a sebességkorlátozási (429) és időtúllépési (408) hibák, amelyek tömeges végrehajtás során fordulhatnak elő, és hogyan lehet hatékonyan újrapróbálkozni. Az alábbi módszerekben implementálva, a helyi és a globális átviteli sebesség szabályozásának implementálását is bemutatja.private void bulkUpsertItemsWithBulkWriterAbstraction() { Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() { ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group1") .setTargetThroughput(200) .build(); container.enableLocalThroughputControlGroup(groupConfig); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); } private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() { String controlContainerId = "throughputControlContainer"; CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId); database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block(); ThroughputControlGroupConfig groupConfig = new ThroughputControlGroupConfigBuilder() .setGroupName("group-" + UUID.randomUUID()) .setTargetThroughput(200) .build(); GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId) .setControlItemRenewInterval(Duration.ofSeconds(5)) .setControlItemExpireInterval(Duration.ofSeconds(20)) .build(); container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig); CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); requestOptions.setThroughputControlGroupName(groupConfig.getGroupName()); Family andersenFamilyItem = Families.getAndersenFamilyItem(); Family wakefieldFamilyItem = Families.getWakefieldFamilyItem(); CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName())); CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName())); BulkWriter bulkWriter = new BulkWriter(container); bulkWriter.scheduleWrites(andersonItemOperation); bulkWriter.scheduleWrites(wakeFieldItemOperation); bulkWriter.execute().subscribe(); }
Emellett a mintában tömeges létrehozási módszerek is találhatók, amelyek bemutatják a válaszfeldolgozás hozzáadását és a végrehajtási beállítások beállítását:
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) { Flux<CosmosItemOperation> cosmosItemOperations = families.map( family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> { CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation(); if (cosmosBulkOperationResponse.getException() != null) { logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); } else if (cosmosBulkItemResponse == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) { logger.error( "The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " + "successfully with " + "a" + " {} response code.", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName(), cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a"); } else { logger.info( "Item ID: [{}] Item PartitionKey Value: [{}]", cosmosItemOperation.<Family>getItem().getId(), cosmosItemOperation.<Family>getItem().getLastName()); logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode()); logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge()); } if (cosmosBulkItemResponse == null) { return Mono.error(new IllegalStateException("No response retrieved.")); } else { return Mono.just(cosmosBulkItemResponse); } }).blockLast(); } private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) { CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions(); // The default value for maxMicroBatchConcurrency is 1. // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage. // // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not. // When the RU has already been under saturation, increasing the concurrency will not help the situation, // rather it may cause more 429 and request timeout. bulkExecutionOptions.setMaxMicroBatchConcurrency(2); Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName()))); container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast(); }
Teljesítménnyel kapcsolatos tippek
Fontolja meg a következő szempontokat a nagyobb teljesítmény érdekében a tömeges végrehajtói kódtár használata esetén:
A legjobb teljesítmény érdekében futtassa az alkalmazást egy Azure-beli virtuális gépről ugyanabban a régióban, mint az Azure Cosmos DB-fiók írási régiója.
Magasabb átviteli sebesség elérése érdekében:
- Állítsa a JVM halomméretét elegendő számúra, hogy elkerülje a sok dokumentum kezelése során felmerülő memóriaproblémát. Javasolt halomméret: max(3 GB, 3 * sizeof(az összes dokumentum, amely egy kötegben a tömeges importálási API-nak lett átadva)).
- Van egy előfeldolgozási idő, amely miatt nagyobb átviteli sebességre lesz szüksége, amikor nagy számú dokumentummal végez tömeges műveleteket. Ha tehát 10 000 000 dokumentumot szeretne importálni, akkor a 10-szeres tömeges importálást 10 alkalommal, 10-szer 100 000 000-es méretben kell futtatni, mint a 100 000-es méretű dokumentumok 100-szeres tömeges importálását.
Ajánlott egyetlen CosmosAsyncClient objektumot példányosítani a teljes alkalmazáshoz egyetlen virtuális gépen belül, amely egy adott Azure Cosmos DB-tárolónak felel meg.
Mivel egyetlen tömeges műveleti API-végrehajtás az ügyfélszámítógép PROCESSZOR- és hálózati I/O-jának nagy részét használja fel. Ez több feladat belső ívásával történik, így elkerülheti, hogy az alkalmazásfolyamaton belül egyszerre több feladat is létrejönjön, és tömeges műveleti API-hívásokat hajt végre. Ha egyetlen virtuális gépen futó egyetlen tömeges műveleti API-hívás nem tudja felhasználni a teljes tároló átviteli sebességét (ha a tároló átviteli sebessége > 1 millió RU/s), célszerű külön virtuális gépeket létrehozni a tömeges műveleti API-hívások egyidejű végrehajtásához.
Következő lépések
- A tömeges végrehajtói funkciók áttekintését a tömeges végrehajtók áttekintésében tekintheti meg.