Migrálás a tömeges végrehajtói kódtárból a tömeges támogatásba az Azure Cosmos DB .NET V3 SDK-ban

A KÖVETKEZŐKRE VONATKOZIK: NoSQL

Ez a cikk a .NET tömeges végrehajtói kódtárat használó meglévő alkalmazás kódjának áttelepítéséhez szükséges lépéseket ismerteti a .NET SDK legújabb verziójában található tömeges támogatási szolgáltatásba.

Tömeges támogatás engedélyezése

Engedélyezze a tömeges támogatást a CosmosClient példányon az AllowBulkExecution konfigurációval:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Feladatok létrehozása minden művelethez

A .NET SDK tömeges támogatása a párhuzamos feladattár és az egyidejűleg végrehajtott csoportosítási műveletek használatával működik.

Az SDK-ban nincs olyan metódus, amely bemeneti paraméterként felveni a dokumentumok vagy műveletek listáját, hanem minden tömegesen végrehajtani kívánt művelethez létre kell hoznia egy feladatot, majd egyszerűen meg kell várnia, amíg befejeződnek.

Ha például a kezdeti bemenet azoknak az elemeknek a listája, amelyek mindegyikéhez a következő séma tartozik:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Ha tömeges importálást szeretne végezni (hasonlóan a BulkExecutor.BulkImportAsync parancshoz), akkor egyidejű hívásokkal kell rendelkeznie a felé CreateItemAsync. Például:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Ha tömeges frissítést szeretne végezni (hasonlóan a BulkExecutor.BulkUpdateAsync használatához), az elemérték frissítése után egyidejű hívásokat kell indítania a metódushoz ReplaceItemAsync . Például:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

Ha pedig tömeges törlést szeretne végezni (hasonlóan a BulkExecutor.BulkDeleteAsync használatához), akkor egyidejű hívásokat kell indítania a parancshoz DeleteItemAsyncaz id egyes elemek és partíciókulcsok használatával. Például:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Feladat eredményállapotának rögzítése

Az előző kód példákban létrehoztunk egy párhuzamos feladatlistát, és meghívtuk a CaptureOperationResponse metódust az egyes tevékenységekhez. Ez a metódus egy olyan bővítmény, amely lehetővé teszi a BulkExecutorhoz hasonló válaszséma fenntartását a hibák rögzítésével és a kérelemegységek használatának nyomon követésével.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Ahol a OperationResponse deklarálva van:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Műveletek egyidejű végrehajtása

A feladatok teljes listájának hatókörének nyomon követéséhez ezt a segédosztályt használjuk:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

A ExecuteAsync metódus megvárja, amíg az összes művelet befejeződik, és a következőképpen használhatja:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Statisztikák rögzítése

Az előző kód megvárja, amíg az összes művelet befejeződik, és kiszámítja a szükséges statisztikákat. Ezek a statisztikák hasonlóak a tömeges végrehajtói kódtár BulkImportResponse eleméhez.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

A BulkOperationResponse tartalmazza a következőket:

  1. A műveletek listájának tömeges támogatással történő feldolgozásához szükséges teljes idő.
  2. A sikeres műveletek száma.
  3. A felhasznált kérelemegységek teljes száma.
  4. Hibák esetén megjeleníti azoknak a rekordoknak a listáját, amelyek a kivételt és a kapcsolódó elemet tartalmazzák naplózási és azonosítási célokra.

Újrapróbálkozás konfigurálása

A tömeges végrehajtói kódtár útmutatást adott a és a MaxRetryWaitTimeInSecondsMaxRetryAttemptsOnThrottledRequestsretryOptions0 beállításához, hogy delegálja a vezérlést a tárba.

A .NET SDK tömeges támogatása esetén nincs rejtett viselkedés. Az újrapróbálkozási beállításokat közvetlenül a CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests és a CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests használatával konfigurálhatja.

Megjegyzés

Olyan esetekben, amikor a kiosztott kérelemegységek az adatok mennyisége alapján sokkal alacsonyabbak a vártnál, érdemes lehet ezeket magas értékekre beállítani. A tömeges művelet hosszabb időt vesz igénybe, de a nagyobb újrapróbálkozások miatt nagyobb az esélye a teljes sikerre.

Teljesítménnyel kapcsolatos fejlesztések

A .NET SDK-val végzett egyéb műveletekhez hasonlóan a stream API-k használata is jobb teljesítményt eredményez, és elkerüli a szükségtelen szerializálást.

Stream API-k használata csak akkor lehetséges, ha a használt adatok természete megegyezik egy bájtos adatfolyaméval (például fájlstreamekkel). Ilyen esetekben a CreateItemStreamAsync, ReplaceItemStreamAsyncvagy DeleteItemStreamAsync metódusok használata és használata ResponseMessage (helyett ItemResponse) növeli az elérhető átviteli sebességet.

Következő lépések