Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
PLATÍ PRO: NoSQL
Tento článek popisuje požadované kroky k migraci kódu existující aplikace, který používá knihovnu .NET bulk executor na funkci hromadné podpory v nejnovější verzi sady .NET SDK.
Umožnit hromadnou podporu
Povolení hromadné podpory instance CosmosClient prostřednictvím konfigurace AllowBulkExecution :
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Vytvoření úkolů pro každou operaci
Hromadná podpora v sadě .NET SDK funguje s využitím paralelní knihovny úloh a operací seskupování, ke kterým dochází souběžně.
Sada SDK neobsahuje jedinou metodu, která jako vstupní parametr vezme seznam dokumentů nebo operací, ale potřebujete vytvořit úlohu pro každou operaci, kterou chcete hromadně spustit, a pak jednoduše počkat, až se dokončí.
Pokud je například počáteční vstup seznamem položek, ve kterých má každá položka následující schéma:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Pokud chcete provést hromadný import (podobně jako BulkExecutor.BulkImportAsync), je nutné mít souběžná volání CreateItemAsync. Příklad:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Pokud chcete provést hromadnou aktualizaci (podobně jako pomocí BulkExecutor.BulkUpdateAsync), musíte mít souběžná volání metody ReplaceItemAsync po aktualizaci hodnoty položky. Příklad:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
A pokud chcete provést hromadné odstranění (podobně jako při použití BulkExecutor.BulkDeleteAsync), je třeba mít souběžná volání na DeleteItemAsync, spolu s klíčem oddílu id pro každou položku. Příklad:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Zaznamenat stav výsledku úkolu
V předchozích příkladech kódu jsme vytvořili souběžný seznam úkolů a pro každou z těchto úloh jsme volali metodu CaptureOperationResponse . Tato metoda je rozšíření, které nám umožňuje udržovat podobné schéma odpovědí jako BulkExecutor tím, že zachytává všechny chyby a sleduje využití jednotek žádostí.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
OperationResponse Kde je deklarován jako:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Souběžné spouštění operací
Ke sledování rozsahu celého seznamu úkolů používáme tuto pomocnou třídu:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
Metoda ExecuteAsync počká, dokud se nedokončí všechny operace, a můžete ji použít takto:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Zachytávání statistik
Předchozí kód čeká na dokončení všech operací a vypočítá požadované statistiky. Tyto statistiky jsou podobné statistice knihovny hromadného exekutora BulkImportResponse.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
Obsahuje BulkOperationResponse :
- Celková doba potřebná ke zpracování seznamu operací prostřednictvím hromadné podpory.
- Počet úspěšných operací.
- Celkový počet využitých jednotek žádostí
- Pokud dojde k selhání, zobrazí se seznam n-tic obsahujících výjimku a přidruženou položku pro účely protokolování a identifikace.
Konfigurace opakování
Knihovna Bulk Executor obsahovala pokyny, které se zmínily o nastavení MaxRetryWaitTimeInSeconds a MaxRetryAttemptsOnThrottledRequests v RetryOptions na 0, aby se řízení delegovalo na knihovnu.
Pro hromadnou podporu v sadě .NET SDK neexistuje žádné skryté chování. Možnosti opakování pokusů můžete nakonfigurovat přímo prostřednictvím CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests a CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Poznámka:
V případech, kdy jsou zřízené jednotky žádostí mnohem nižší než očekávané na základě množství dat, můžete zvážit nastavení těchto jednotek na vysoké hodnoty. Hromadná operace bude trvat déle, ale má větší šanci na úspěch kvůli vyššímu počtu opakovaných pokusů.
Zlepšení výkonu
Stejně jako u jiných operací s .NET SDK, použití rozhraní API datových proudů zajišťuje lepší výkon a vyhýbá se zbytečné serializaci.
Použití rozhraní API streamu je možné pouze v případě, že povaha dat, která používáte, odpovídá datovému proudu bajtů (například datových proudů souborů). Vtakovýchch CreateItemStreamAsyncReplaceItemStreamAsyncDeleteItemStreamAsyncResponseMessageItemResponse
Další kroky
- Další informace o vydaných verzích sady .NET SDK najdete v článku o sadě SDK služby Azure Cosmos DB.
- Získejte úplný zdrojový kód migrace z GitHubu.
- Další hromadné ukázky na GitHubu
- Pokoušíte se naplánovat kapacitu migrace do služby Azure Cosmos DB?
- Pokud vše, co víte, je počet virtuálních jader a serverů ve vašem stávajícím databázovém clusteru, přečtěte si o odhadu jednotek požadavků pomocí virtuálních jader nebo virtuálních procesorů.
- Pokud znáte typické sazby požadavků pro vaši aktuální úlohu databáze, přečtěte si informace o odhadu jednotek žádostí pomocí plánovače kapacity služby Azure Cosmos DB.