Tömeges műveletek végrehajtása az Azure Cosmos DB-ben a tömeges végrehajtó .NET-kódtár használatával

A KÖVETKEZŐRE VONATKOZIK: NoSQL

Megjegyzés:

A cikkben ismertetett tömeges végrehajtói kódtár a .NET SDK 2.x verzióját használó alkalmazásokhoz van fenntartva. Új alkalmazások esetén használhatja a .NET SDK 3.x verziójával közvetlenül elérhető tömeges támogatást, és nem igényel külső kódtárat.

Ha jelenleg a tömeges végrehajtói kódtárat használja, és tömeges támogatásra szeretné migrálni az újabb SDK-t, az áttelepítési útmutató lépéseit követve migrálhatja az alkalmazást.

Ez az oktatóanyag bemutatja, hogyan importálhat és frissíthet dokumentumokat egy Azure Cosmos DB-tárolóba a tömeges végrehajtó .NET-kódtár használatával. Ha többet szeretne megtudni a tömeges végrehajtói kódtárról, valamint arról, hogyan segít a nagy átviteli sebesség és a tárolás használatában, tekintse meg az Azure Cosmos DB tömeges végrehajtói kódtárának áttekintését. Ebben az oktatóanyagban egy minta .NET-alkalmazást láthat, amely véletlenszerűen létrehozott dokumentumokat importál egy Azure Cosmos DB-tárolóba. Az adatok importálása után a tár bemutatja, hogyan frissítheti tömegesen az importált adatokat úgy, hogy adott dokumentummezőkön végrehajtandó műveletekként javításokat ad meg.

A tömeges végrehajtói kódtárat jelenleg csak a NoSQL-hez készült Azure Cosmos DB és a Gremlin-fiókokhoz készült API támogatja. Ez a cikk bemutatja, hogyan használhatja a tömeges végrehajtó .NET-kódtárat a NoSQL-fiókok api-jával. Ha meg szeretné tudni, hogyan használhatja a tömeges végrehajtó .NET-kódtárat a Gremlin-fiókokhoz készült API-val, tekintse meg az adatok tömeges betöltését a Gremlinhez készült Azure Cosmos DB-ben egy tömeges végrehajtói kódtár használatával.

Előfeltételek

  • A legújabb Visual Studio az Azure fejlesztési számítási feladatával. Első lépésként használhatja az ingyenesVisual Studio Community IDE-t. Engedélyezze az Azure fejlesztési számítási feladatát a Visual Studio beállítása során.

  • Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.

  • Azure-előfizetés nélkül ingyenesen kipróbálhatja az Azure Cosmos DB-t. Az Azure Cosmos DB Emulatort helyi fejlesztéshez és teszteléshez is telepítheti és használhatja a https://localhost:8081 végponttal. Az elsődleges kulcs a Kérelmek hitelesítése című részben található.

  • Hozzon létre egy Azure Cosmos DB for NoSQL-fiókot a .NET-hez készült Azure Cosmos DB-ügyfélkódtár azure Cosmos DB-fiók létrehozása című szakaszában leírt lépésekkel.

A mintaalkalmazás klónozása

Most váltsunk a kód használatára úgy, hogy letöltünk egy minta .NET-alkalmazást a GitHubról. Ez az alkalmazás tömeges műveleteket hajt végre az Azure Cosmos DB-fiókban tárolt adatokon. Az alkalmazás klónozásához nyisson meg egy parancssort, keresse meg azt a könyvtárat, ahová másolni szeretné, és futtassa a következő parancsot:

git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git

A klónozott adattár két mintát tartalmaz: BulkImportSample és BulkUpdateSample. Megnyithatja bármelyik mintaalkalmazást, frissítheti az App.config fájlban lévő kapcsolati sztring az Azure Cosmos DB-fiók kapcsolati sztring, elkészítheti és futtathatja a megoldást.

A BulkImportSample alkalmazás véletlenszerű dokumentumokat hoz létre, és tömegesen importálja őket az Azure Cosmos DB-fiókjába. A BulkUpdateSample alkalmazás tömegesen frissíti az importált dokumentumokat úgy, hogy adott dokumentummezőkön végrehajtandó műveletekként javításokat ad meg. A következő szakaszokban áttekintheti az egyes mintaalkalmazások kódját.

Adatok tömeges importálása Azure Cosmos DB-fiókba

  1. Lépjen a BulkImportSample mappába, és nyissa meg a BulkImportSample.sln fájlt.

  2. Az Azure Cosmos DB kapcsolati sztring az App.config fájlból lesznek lekérve az alábbi kódban látható módon:

    private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"];
    private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"];
    private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
    private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"];
    private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
    

    A tömeges importáló létrehoz egy új adatbázist és egy tárolót az adatbázis nevével, a tároló nevével és az App.config fájlban megadott átviteli sebességértékekkel.

  3. Ezután a DocumentClient objektum inicializálva lesz a Közvetlen TCP kapcsolati móddal:

    ConnectionPolicy connectionPolicy = new ConnectionPolicy
    {
       ConnectionMode = ConnectionMode.Direct,
       ConnectionProtocol = Protocol.Tcp
    };
    DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey,
    connectionPolicy)
    
  4. A BulkExecutor objektum inicializálása magas újrapróbálkozási értékkel történik a várakozási idő és a szabályozott kérelmek esetében. Ezután a 0 értékre vannak állítva, hogy a bulkExecutor teljes élettartama alatt átengedje a torlódás-vezérlést.

    // Set retry options high during initialization (default values).
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
    
    IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
    await bulkExecutor.InitializeAsync();
    
    // Set retries to 0 to pass complete control to bulk executor.
    client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
    client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
    
  5. Az alkalmazás meghívja a BulkImportAsync API-t. A .NET-kódtár a tömeges importálási API két túlterhelését biztosítja– az egyik a szerializált JSON-dokumentumok listáját fogadja el, a másik pedig a deszerializált POCO-dokumentumok listáját. Az egyes túlterhelt metódusok definícióival kapcsolatos további információkért tekintse meg az API dokumentációját.

    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: documentsToImportInBatch,
      enableUpsert: true,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    A BulkImportAsync metódus a következő paramétereket fogadja el:

    Paraméter Ismertetés
    enableUpsert Jelölő a dokumentumokon végzett upsert műveletek engedélyezéséhez. Ha már létezik a megadott azonosítóval rendelkező dokumentum, az frissül. Alapértelmezés szerint hamis értékre van állítva.
    disableAutomaticIdGeneration Az azonosító automatikus generálásának letiltását jelölő. Alapértelmezés szerint igaz értékre van állítva.
    maxConcurrencyPerPartitionKeyRange A partíciókulcs-tartományonkénti egyidejűség maximális mértéke. A null értékre állítás miatt a kódtár alapértelmezett értéke 20.
    maxInMemorySortingBatchSize A dokumentum-enumerátorból lekért dokumentumok maximális száma, amelyet a rendszer minden fázisban átad az API-hívásnak. A memóriabeli rendezési fázishoz, amely a tömeges importálás előtt történik. Ha ezt a paramétert null értékre állítja, a kódtár az alapértelmezett minimális értéket használja (documents.count, 10000000).
    cancellationToken A lemondási jogkivonat, amely elegánsan kilép a tömeges importálási műveletből.

Válaszobjektumok tömeges importálása
A tömeges importálási API-hívás eredménye a következő attribútumokat tartalmazza:

Paraméter Ismertetés
NumberOfDocumentsImported (hosszú) A tömeges importálási API-híváshoz megadott összes dokumentumból sikeresen importált dokumentumok teljes száma.
TotalRequestUnitsConsumed (dupla) A tömeges importálási API-hívás által felhasznált összes kérelemegység (RU).
TotalTimeTaken (TimeSpan) A tömeges importálási API-hívás által a végrehajtás befejezéséhez szükséges teljes idő.
BadInputDocuments (Listaobjektum><) Azoknak a rossz formátumú dokumentumoknak a listája, amelyeket nem sikerült importálni a tömeges importálási API-hívásban. Javítsa ki a visszaadott dokumentumokat, és próbálkozzon újra az importálással. A hibás formátumú dokumentumok közé tartoznak azok a dokumentumok, amelyek azonosítóértéke nem sztring (null vagy bármely más adattípus érvénytelennek minősül).

Adatok tömeges frissítése az Azure Cosmos DB-fiókban

A meglévő dokumentumokat a BulkUpdateAsync API-val frissítheti. Ebben a példában új értékre állítja a Name mezőt, és eltávolítja a mezőt a Description meglévő dokumentumokból. A támogatott frissítési műveletek teljes készletét az API dokumentációjában találja.

  1. Lépjen a BulkUpdateSample mappába, és nyissa meg a BulkUpdateSample.sln fájlt.

  2. Adja meg a frissítési elemeket a megfelelő mezőfrissítési műveletekkel együtt. Ebben a példában a SetUpdateOperation használatával frissíti a Name mezőt, az UnsetUpdateOperation pedig eltávolítja a mezőt az Description összes dokumentumból. Egyéb műveleteket is végrehajthat, például a dokumentummezők adott érték szerinti növelését, adott értékek tömbmezőbe való leküldését vagy egy adott érték tömbmezőből való eltávolítását. A tömeges frissítési API által biztosított különböző módszerek megismeréséhez tekintse meg az API dokumentációját.

    SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
    UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");
    
    List<UpdateOperation> updateOperations = new List<UpdateOperation>();
    updateOperations.Add(nameUpdate);
    updateOperations.Add(descriptionUpdate);
    
    List<UpdateItem> updateItems = new List<UpdateItem>();
    for (int i = 0; i < 10; i++)
    {
     updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
    }
    
  3. Az alkalmazás meghívja a BulkUpdateAsync API-t. A BulkUpdateAsync metódus definíciójának megismeréséhez tekintse meg az API dokumentációját.

    BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
      updateItems: updateItems,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null,
      cancellationToken: token);
    

    A BulkUpdateAsync metódus a következő paramétereket fogadja el:

    Paraméter Ismertetés
    maxConcurrencyPerPartitionKeyRange A partíciókulcs-tartományonkénti egyidejűség maximális mértéke. Ha ezt a paramétert null értékre állítja, az erőforrástár az alapértelmezett értéket használja(20).
    maxInMemorySortingBatchSize Az egyes fázisokban az API-hívásnak átadott frissítési elemek számbavételének maximális száma. A memóriabeli rendezési fázishoz, amely a tömeges frissítés előtt történik. Ha ezt a paramétert null értékre állítja, az erőforrástár az alapértelmezett minimális értéket használja (updateItems.count, 10000000).
    cancellationToken A lemondási jogkivonat, amely elegánsan kilép a tömeges frissítési műveletből.

Tömeges frissítési válaszobjektum definíciója
A tömeges frissítési API-hívás eredménye a következő attribútumokat tartalmazza:

Paraméter Ismertetés
NumberOfDocumentsUpdated (hosszú) A tömeges frissítési API-híváshoz megadott összes dokumentumból sikeresen frissített dokumentumok száma.
TotalRequestUnitsConsumed (dupla) A tömeges frissítési API-hívás által felhasznált kérelemegységek (RU-k) teljes száma.
TotalTimeTaken (TimeSpan) A tömeges frissítési API-hívás által a végrehajtás befejezéséhez szükséges teljes idő.

Teljesítménnyel kapcsolatos tippek

A tömeges végrehajtói kódtár használatakor vegye figyelembe a következő pontokat a jobb teljesítmény érdekében:

  • A legjobb teljesítmény érdekében futtassa az alkalmazást egy Olyan Azure-beli virtuális gépről, amely ugyanabban a régióban található, mint az Azure Cosmos DB-fiók írási régiója.

  • Ajánlott egyetlen BulkExecutor objektumot példányosítani a teljes alkalmazáshoz egyetlen virtuális gépen belül, amely egy adott Azure Cosmos DB-tárolónak felel meg.

  • Egyetlen tömeges műveleti API-végrehajtás az ügyfélszámítógép processzor- és hálózati I/O-jának nagy részét használja fel több feladat belső ívásakor. Kerülje a tömeges műveleti API-hívások végrehajtására irányuló több egyidejű feladatot az alkalmazásfolyamaton belül. Ha egyetlen virtuális gépen futó egyetlen tömeges műveleti API-hívás nem tudja felhasználni a teljes tároló átviteli sebességét (ha a tároló átviteli sebessége > 1 millió RU/s), célszerű külön virtuális gépeket létrehozni a tömeges műveleti API-hívások egyidejű végrehajtásához.

  • Győződjön meg arról, hogy a metódust egy InitializeAsync() BulkExecutor objektum példányosítása után hívja meg a rendszer a cél Azure Cosmos DB-tároló partíciótérképének lekéréséhez.

  • Az alkalmazás App.Config-ban győződjön meg arról, hogy a gcServer engedélyezve van a jobb teljesítmény érdekében

    <runtime>
      <gcServer enabled="true" />
    </runtime>
    
  • A kódtár olyan nyomkövetéseket bocsát ki, amelyek egy naplófájlba vagy a konzolra gyűjthetők. Mindkettő engedélyezéséhez adja hozzá az alábbi kódot az alkalmazás App.Config fájljához:

    <system.diagnostics>
      <trace autoflush="false" indentsize="4">
        <listeners>
          <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
          <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
        </listeners>
      </trace>
    </system.diagnostics>
    

Következő lépések