Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Poznámka:
Knihovna Bulk Executor zmíněná v tomto článku je udržována pro aplikace, které používají verzi .NET SDK 2.x. Pro nové aplikace můžete použít hromadnou podporu , která je přímo dostupná se sadou .NET SDK verze 3.x a nevyžaduje žádnou externí knihovnu.
Pokud aktuálně používáte knihovnu Bulk Executor a plánujete migrovat na hromadnou podporu v novější sadě SDK, postupujte podle kroků v průvodci migrací k migraci aplikace.
Tento kurz obsahuje pokyny k použití knihovny Bulk Executor .NET k importu a aktualizaci dokumentů do kontejneru Azure Cosmos DB. Informace o knihovně Bulk Executor a o tom, jak vám pomůže používat obrovskou propustnost a úložiště, najdete v přehledu knihovny bulk Executor služby Azure Cosmos DB. V tomto kurzu se zobrazí ukázková aplikace .NET, ve které se hromadně importují náhodně generované dokumenty do kontejneru Azure Cosmos DB. Po importu dat vám knihovna ukáže, jak můžete importovaná data hromadně aktualizovat zadáním oprav jako operací, které se mají provést u konkrétních polí dokumentu.
Knihovna hromadného vykonavatele je v současné době podporována pouze pro účty Azure Cosmos DB for NoSQL a pro API Gremlin. Tento článek popisuje, jak používat knihovnu Bulk Executor .NET pro API účty v NoSQL. Pokud chcete zjistit, jak používat knihovnu Bulk Executor .NET s rozhraním API pro účty Gremlin, přečtěte si, jak hromadně přijímat data ve službě Azure Cosmos DB pro Gremlin pomocí knihovny Bulk Executor.
Požadavky
Nejnovější sada Visual Studio s úlohou vývoje pro Azure Můžete začít s bezplatným integrovaným vývojovým prostředím sady Visual Studio Community. Povolte pracovní zátěž pro vývoj Azure během instalace sady Visual Studio.
Pokud nemáte předplatné Azure, vytvořte si bezplatný účet před zahájením. Můžete také nainstalovat a použít emulátor služby Azure Cosmos DB pro místní vývoj a testování pomocí koncového
https://localhost:8081bodu. Primární klíč je uvedený v části Ověřování požadavků.Pomocí kroků popsaných v části Vytvoření účtu Azure Cosmos DB v Rychlý start: Knihovna klientů Azure Cosmos DB pro NoSQL pro .NET vytvořte účet Azure Cosmos DB pro NoSQL.
Klonování ukázkové aplikace
Teď přejdeme na práci s kódem stažením ukázkové aplikace .NET z GitHubu. Tato aplikace provádí hromadné operace s daty uloženými v účtu služby Azure Cosmos DB. Pokud chcete aplikaci naklonovat, otevřete příkazový řádek, přejděte do adresáře, do kterého ji chcete zkopírovat, a spusťte následující příkaz:
git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git
Klonované úložiště obsahuje dvě ukázky: BulkImportSample a BulkUpdateSample. Můžete otevřít kteroukoliv z ukázkových aplikací, aktualizovat připojovací řetězce v souboru App.config pomocí připojovacích řetězců vašeho účtu služby Azure Cosmos DB, sestavit řešení a spustit ho.
Aplikace BulkImportSample generuje náhodné dokumenty a hromadně je naimportuje do účtu služby Azure Cosmos DB. Aplikace BulkUpdateSample hromadně aktualizuje importované dokumenty zadáním oprav jako operací, které se mají provést u konkrétních polí dokumentu. V dalších částech si projdete kód v každé z těchto ukázkových aplikací.
Hromadný import dat do účtu služby Azure Cosmos DB
Přejděte do složky BulkImportSample a otevřete soubor BulkImportSample.sln .
Připojovací řetězce služby Azure Cosmos DB jsou načteny ze souboru App.config, jak je znázorněno v následujícím kódu.
private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"]; private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"]; private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"]; private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"]; private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);Hromadný import vytvoří novou databázi a kontejner s názvem databáze, názvem kontejneru a hodnotami propustnosti zadanými v souboru App.config.
Dále se objekt DocumentClient inicializuje pomocí režimu přímého připojení TCP:
ConnectionPolicy connectionPolicy = new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp }; DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey, connectionPolicy)Objekt BulkExecutor je inicializován s vysokou hodnotou opakování pro dobu čekání a omezené požadavky. Pak jsou nastaveny na hodnotu 0, aby po dobu jeho životnosti předaly řízení přetížení BulkExecutor.
// Set retry options high during initialization (default values). client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9; IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection); await bulkExecutor.InitializeAsync(); // Set retries to 0 to pass complete control to bulk executor. client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;Aplikace vyvolá BulkImportAsync API. Knihovna .NET poskytuje dvě verze rozhraní API hromadného importu – jednu, která přijímá seznam serializovaných dokumentů JSON, a druhou, která přijímá seznam deserializovaných dokumentů POCO. Další informace o definicích každé z těchto přetížených metod najdete v dokumentaci k rozhraní API.
BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync( documents: documentsToImportInBatch, enableUpsert: true, disableAutomaticIdGeneration: true, maxConcurrencyPerPartitionKeyRange: null, maxInMemorySortingBatchSize: null, cancellationToken: token);Metoda BulkImportAsync přijímá následující parametry:
Parameter Description enableUpsert Příznak umožňující provádění operací typu upsert na dokumentech. Pokud dokument s daným ID už existuje, aktualizuje se. Ve výchozím nastavení je nastavená na false. disableAutomaticIdGeneration Příznak, který zakáže automatické generování ID. Ve výchozím nastavení je nastavená na hodnotu true. maxConcurrencyPerPartitionKeyRange Maximální stupeň souběžnosti pro rozsah klíčů oddílu. Nastavení na hodnotu null způsobí, že knihovna použije výchozí hodnotu 20. maxInMemorySortingBatchSize Maximální počet dokumentů, které jsou načítány z enumerátoru, je předáván v každé fázi volání rozhraní API. Pro fázi řazení v paměti, ke které dochází před hromadným importem. Nastavení tohoto parametru na hodnotu null způsobí, že knihovna použije výchozí minimální hodnotu (documents.count, 1000000). token zrušení Token zrušení pro řádné ukončení operace hromadného importu.
Definice objektu odpovědi hromadného importu
Výsledek volání rozhraní API hromadného importu obsahuje následující atributy:
| Parameter | Description |
|---|---|
| PočetDokumentůImportováno (dlouhý) | Celkový počet dokumentů, které byly úspěšně importovány z celkového počtu dokumentů zadaných do volání rozhraní API hromadného importu. |
| TotalRequestUnitsConsumed (double) | Celkový počet jednotek žádostí, které byly spotřebovány při volání API pro hromadný import. |
| TotalTimeTaken (TimeSpan) | Celkový čas potřebný voláním rozhraní API hromadného importu k dokončení provádění. |
| BadInputDocuments (List<objekt>) | Seznam dokumentů ve špatném formátu, které nebyly úspěšně importovány ve volání rozhraní API hromadného importu. Opravte vrácené dokumenty a zkuste import zopakovat. Dokumenty s chybným formátováním zahrnují dokumenty, jejichž hodnota ID není řetězec (hodnota null nebo jiný datový typ je považován za neplatný). |
Hromadná aktualizace dat v účtu služby Azure Cosmos DB
Existující dokumenty můžete aktualizovat pomocí rozhraní BulkUpdateAsync API. V tomto příkladu Name nastavíte pole na novou hodnotu a odeberete Description pole z existujících dokumentů. Úplnou sadu podporovaných operací aktualizací najdete v dokumentaci k rozhraní API.
Přejděte do složky BulkUpdateSample a otevřete soubor BulkUpdateSample.sln .
Definujte položky aktualizace spolu s odpovídajícími operacemi aktualizace polí. V tomto příkladu pomocí SetUpdateOperation aktualizujete
Namepole a UnsetUpdateOperation odebereteDescriptionpole ze všech dokumentů. Můžete také provádět jiné operace, jako je zvýšení hodnoty v poli dokumentu o určitou hodnotu, přidání konkrétních hodnot do pole nebo odebrání konkrétní hodnoty z pole. Informace o různých metodách poskytovaných rozhraním API hromadné aktualizace najdete v dokumentaci k rozhraní API.SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc"); UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description"); List<UpdateOperation> updateOperations = new List<UpdateOperation>(); updateOperations.Add(nameUpdate); updateOperations.Add(descriptionUpdate); List<UpdateItem> updateItems = new List<UpdateItem>(); for (int i = 0; i < 10; i++) { updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations)); }Aplikace vyvolá API BulkUpdateAsync. Informace o definici metody BulkUpdateAsync najdete v dokumentaci k rozhraní API.
BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync( updateItems: updateItems, maxConcurrencyPerPartitionKeyRange: null, maxInMemorySortingBatchSize: null, cancellationToken: token);Metoda BulkUpdateAsync přijímá následující parametry:
Parameter Description maxConcurrencyPerPartitionKeyRange Maximální stupeň souběžnosti pro rozsah klíčů oddílu. Nastavením tohoto parametru na hodnotu null knihovna použije výchozí hodnotu(20). maxInMemorySortingBatchSize Maximální počet aktualizačních položek načítaných z enumerátoru aktualizačních položek při řízeném volání API v každé fázi. Pro fázi řazení v paměti, ke které dochází před hromadnou aktualizací. Nastavení tohoto parametru na hodnotu null způsobí, že knihovna použije výchozí minimální hodnotu (updateItems.count, 10000000). token zrušení Token zrušení pro řádné ukončení operace hromadné aktualizace.
Definice objektu odpovědi na hromadnou aktualizaci
Výsledek volání rozhraní API hromadné aktualizace obsahuje následující atributy:
| Parameter | Description |
|---|---|
| NumberOfDocumentsUpdated (dlouhý) | Počet dokumentů, které byly úspěšně aktualizovány z celkového počtu dokumentů poskytnutých pro hromadnou aktualizaci prostřednictvím API volání. |
| TotalRequestUnitsConsumed (double) | Celkový počet jednotek žádostí, které byly spotřebovány voláním hromadné aktualizace API. |
| TotalTimeTaken (TimeSpan) | Celková doba potřebná voláním rozhraní API hromadné aktualizace k dokončení provádění. |
Tipy týkající se výkonu
Pokud používáte knihovnu Bulk Executor, zvažte následující body pro lepší výkon:
Pokud chcete dosáhnout nejlepšího výkonu, spusťte aplikaci z virtuálního počítače Azure, který je ve stejné oblasti jako oblast zápisu účtu služby Azure Cosmos DB.
Doporučujeme vytvořit instanci jednoho objektu BulkExecutor pro celou aplikaci v rámci jednoho virtuálního počítače, který odpovídá určitému kontejneru Azure Cosmos DB.
Jeden výkon API pro hromadnou operaci výrazně zatěžuje výkon CPU a síťových vstupně-výstupních operací klientského počítače při spuštění více úloh interně. Vyhněte se vytváření více souběžných úloh v rámci procesu aplikace, které provádějí volání rozhraní API hromadné operace. Pokud jedno volání rozhraní API hromadné operace spuštěné na jednom virtuálním počítači nemůže spotřebovat propustnost celého kontejneru (pokud propustnost > kontejneru 1 milion RU/s), je vhodnější vytvořit samostatné virtuální počítače pro souběžné spouštění volání rozhraní API hromadné operace.
Zajistěte, aby byla metoda
InitializeAsync()vyvolána po vytvoření instance objektu BulkExecutor pro načtení mapování oddílů cílového kontejneru Azure Cosmos DB.V souboru App.Config vaší aplikace se ujistěte, že je pro lepší výkon povolený gcServer .
<runtime> <gcServer enabled="true" /> </runtime>Knihovna generuje stopy, které se dají shromažďovat buď do souboru protokolu, nebo na konzoli. Pokud chcete obojí povolit, přidejte do souboru App.Config vaší aplikace následující kód:
<system.diagnostics> <trace autoflush="false" indentsize="4"> <listeners> <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" /> <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" /> </listeners> </trace> </system.diagnostics>