Migrowanie z biblioteki funkcji wykonawczej operacji zbiorczych do obsługi zbiorczej w zestawie SDK platformy .NET platformy .NET w wersji 3
DOTYCZY: NoSQL
W tym artykule opisano kroki wymagane do przeprowadzenia migracji kodu istniejącej aplikacji, który używa biblioteki funkcji wykonawczej zbiorczej platformy .NET do funkcji obsługi zbiorczej w najnowszej wersji zestawu .NET SDK.
Włączanie obsługi zbiorczej
Włącz obsługę zbiorczą w wystąpieniu CosmosClient
za pomocą konfiguracji AllowBulkExecution :
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Tworzenie zadań dla każdej operacji
Obsługa zbiorcza zestawu .NET SDK działa przy użyciu biblioteki równoległej zadań i operacji grupowania wykonywanych współbieżnie.
W zestawie SDK nie ma jednej metody, która będzie pobierać listę dokumentów lub operacji jako parametr wejściowy, ale zamiast tego należy utworzyć zadanie dla każdej operacji, którą chcesz wykonać zbiorczo, a następnie po prostu poczekać na ich ukończenie.
Jeśli na przykład początkowe dane wejściowe są listą elementów, w których każdy element ma następujący schemat:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Jeśli chcesz wykonać importowanie zbiorcze (podobnie jak w przypadku funkcji BulkExecutor.BulkImportAsync), musisz mieć współbieżne wywołania metody .CreateItemAsync
Na przykład:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Jeśli chcesz przeprowadzić aktualizację zbiorczą (podobną do metody BulkExecutor.BulkUpdateAsync), należy mieć współbieżne wywołania metody ReplaceItemAsync
po zaktualizowaniu wartości elementu. Na przykład:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
Jeśli chcesz usunąć zbiorczo (podobnie jak w przypadku funkcji BulkExecutor.BulkDeleteAsync), musisz mieć równoczesne wywołania elementu DeleteItemAsync
z kluczem id
partycji i . Na przykład:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Przechwyć stan wyniku zadania
W poprzednich przykładach kodu utworzyliśmy współbieżną listę zadań i wywołaliśmy metodę CaptureOperationResponse
dla każdego z tych zadań. Ta metoda jest rozszerzeniem, które pozwala nam zachować podobny schemat odpowiedzi jako BulkExecutor, przechwytując wszelkie błędy i śledząc użycie jednostek żądania.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
OperationResponse
Gdzie element jest zadeklarowany jako:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Wykonywanie operacji współbieżnie
Aby śledzić zakres całej listy zadań, użyjemy tej klasy pomocniczej:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
Metoda ExecuteAsync
będzie czekać na zakończenie wszystkich operacji i można jej użyć w następujący sposób:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Przechwytywanie statystyk
Poprzedni kod czeka na ukończenie wszystkich operacji i oblicza wymagane statystyki. Te statystyki są podobne do tych biblioteki funkcji wykonawczej zbiorczej BulkImportResponse.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
Zawiera BulkOperationResponse
:
- Łączny czas potrzebny na przetworzenie listy operacji za pośrednictwem obsługi zbiorczej.
- Liczba pomyślnych operacji.
- Łączna liczba wykorzystanych jednostek żądania.
- Jeśli wystąpią błędy, zostanie wyświetlona lista krotki, które zawierają wyjątek i skojarzony element na potrzeby rejestrowania i identyfikacji.
Ponów próbę konfiguracji
Biblioteka funkcji wykonawczej zbiorczej zawierała wskazówki dotyczące ustawiania MaxRetryWaitTimeInSeconds
poleceń i MaxRetryAttemptsOnThrottledRequests
RetryOptions w celu 0
delegowania kontroli do biblioteki.
W przypadku zbiorczej obsługi zestawu .NET SDK nie ma ukrytego zachowania. Opcje ponawiania można skonfigurować bezpośrednio za pomocą poleceń CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests i CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Uwaga
W przypadkach, gdy aprowidowane jednostki żądania są znacznie niższe niż oczekiwano na podstawie ilości danych, warto rozważyć ustawienie tych jednostek na wysokie wartości. Operacja zbiorcza potrwa dłużej, ale ma większe szanse na całkowite powodzenie z powodu wyższych ponownych prób.
usprawnienia dotyczące wydajności
Podobnie jak w przypadku innych operacji z zestawem .NET SDK, korzystanie z interfejsów API strumienia zapewnia lepszą wydajność i pozwala uniknąć niepotrzebnej serializacji.
Korzystanie z interfejsów API strumienia jest możliwe tylko wtedy, gdy charakter używanych danych jest zgodny ze strumieniem bajtów (na przykład strumieniami plików). W takich przypadkach użycie CreateItemStreamAsync
metod , ReplaceItemStreamAsync
lub DeleteItemStreamAsync
i praca z ResponseMessage
(zamiast ItemResponse
) zwiększa przepływność, którą można osiągnąć.
Następne kroki
- Aby dowiedzieć się więcej na temat wersji zestawu .NET SDK, zobacz artykuł Dotyczący zestawu SDK usługi Azure Cosmos DB.
- Pobierz pełny kod źródłowy migracji z usługi GitHub.
- Dodatkowe przykłady zbiorcze w usłudze GitHub
- Próbujesz zaplanować pojemność migracji do usługi Azure Cosmos DB?
- Jeśli wiesz, ile rdzeni wirtualnych i serwerów znajduje się w istniejącym klastrze bazy danych, przeczytaj o szacowaniu jednostek żądań przy użyciu rdzeni wirtualnych lub procesorów wirtualnych
- Jeśli znasz typowe stawki żądań dla bieżącego obciążenia bazy danych, przeczytaj o szacowaniu jednostek żądań przy użyciu planisty pojemności usługi Azure Cosmos DB