Migrálás a tömeges végrehajtói kódtárból a tömeges támogatásba az Azure Cosmos DB .NET V3 SDK-ban
A KÖVETKEZŐKRE VONATKOZIK: NoSQL
Ez a cikk a .NET tömeges végrehajtói kódtárat használó meglévő alkalmazás kódjának áttelepítéséhez szükséges lépéseket ismerteti a .NET SDK legújabb verziójában található tömeges támogatási szolgáltatásba.
Tömeges támogatás engedélyezése
Engedélyezze a tömeges támogatást a CosmosClient
példányon az AllowBulkExecution konfigurációval:
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Feladatok létrehozása minden művelethez
A .NET SDK tömeges támogatása a párhuzamos feladattár és az egyidejűleg végrehajtott csoportosítási műveletek használatával működik.
Az SDK-ban nincs olyan metódus, amely bemeneti paraméterként felveni a dokumentumok vagy műveletek listáját, hanem minden tömegesen végrehajtani kívánt művelethez létre kell hoznia egy feladatot, majd egyszerűen meg kell várnia, amíg befejeződnek.
Ha például a kezdeti bemenet azoknak az elemeknek a listája, amelyek mindegyikéhez a következő séma tartozik:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Ha tömeges importálást szeretne végezni (hasonlóan a BulkExecutor.BulkImportAsync parancshoz), akkor egyidejű hívásokkal kell rendelkeznie a felé CreateItemAsync
. Például:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Ha tömeges frissítést szeretne végezni (hasonlóan a BulkExecutor.BulkUpdateAsync használatához), az elemérték frissítése után egyidejű hívásokat kell indítania a metódushoz ReplaceItemAsync
. Például:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
Ha pedig tömeges törlést szeretne végezni (hasonlóan a BulkExecutor.BulkDeleteAsync használatához), akkor egyidejű hívásokat kell indítania a parancshoz DeleteItemAsync
az id
egyes elemek és partíciókulcsok használatával. Például:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Feladat eredményállapotának rögzítése
Az előző kód példákban létrehoztunk egy párhuzamos feladatlistát, és meghívtuk a CaptureOperationResponse
metódust az egyes tevékenységekhez. Ez a metódus egy olyan bővítmény, amely lehetővé teszi a BulkExecutorhoz hasonló válaszséma fenntartását a hibák rögzítésével és a kérelemegységek használatának nyomon követésével.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
Ahol a OperationResponse
deklarálva van:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Műveletek egyidejű végrehajtása
A feladatok teljes listájának hatókörének nyomon követéséhez ezt a segédosztályt használjuk:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
A ExecuteAsync
metódus megvárja, amíg az összes művelet befejeződik, és a következőképpen használhatja:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Statisztikák rögzítése
Az előző kód megvárja, amíg az összes művelet befejeződik, és kiszámítja a szükséges statisztikákat. Ezek a statisztikák hasonlóak a tömeges végrehajtói kódtár BulkImportResponse eleméhez.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
A BulkOperationResponse
tartalmazza a következőket:
- A műveletek listájának tömeges támogatással történő feldolgozásához szükséges teljes idő.
- A sikeres műveletek száma.
- A felhasznált kérelemegységek teljes száma.
- Hibák esetén megjeleníti azoknak a rekordoknak a listáját, amelyek a kivételt és a kapcsolódó elemet tartalmazzák naplózási és azonosítási célokra.
Újrapróbálkozás konfigurálása
A tömeges végrehajtói kódtár útmutatást adott a és a MaxRetryWaitTimeInSeconds
MaxRetryAttemptsOnThrottledRequests
retryOptions0
beállításához, hogy delegálja a vezérlést a tárba.
A .NET SDK tömeges támogatása esetén nincs rejtett viselkedés. Az újrapróbálkozási beállításokat közvetlenül a CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests és a CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests használatával konfigurálhatja.
Megjegyzés
Olyan esetekben, amikor a kiosztott kérelemegységek az adatok mennyisége alapján sokkal alacsonyabbak a vártnál, érdemes lehet ezeket magas értékekre beállítani. A tömeges művelet hosszabb időt vesz igénybe, de a nagyobb újrapróbálkozások miatt nagyobb az esélye a teljes sikerre.
Teljesítménnyel kapcsolatos fejlesztések
A .NET SDK-val végzett egyéb műveletekhez hasonlóan a stream API-k használata is jobb teljesítményt eredményez, és elkerüli a szükségtelen szerializálást.
Stream API-k használata csak akkor lehetséges, ha a használt adatok természete megegyezik egy bájtos adatfolyaméval (például fájlstreamekkel). Ilyen esetekben a CreateItemStreamAsync
, ReplaceItemStreamAsync
vagy DeleteItemStreamAsync
metódusok használata és használata ResponseMessage
(helyett ItemResponse
) növeli az elérhető átviteli sebességet.
Következő lépések
- A .NET SDK-kiadásokról további információt az Azure Cosmos DB SDK-ról szóló cikkben talál.
- Szerezze be a teljes migrálási forráskódot a GitHubról.
- További tömeges minták a GitHubon
- Kapacitástervezést szeretne végezni az Azure Cosmos DB-be való migráláshoz?
- Ha csak a meglévő adatbázisfürtben lévő virtuális magok és kiszolgálók számát ismeri, olvassa el a kérelemegységek virtuális magok vagy vCPU-k használatával történő becslését ismertető cikket.
- Ha ismeri az aktuális adatbázis számítási feladatának jellemző kérési arányait, olvassa el a kérelemegységek becslését az Azure Cosmos DB Capacity Planner használatával