Udostępnij za pośrednictwem


Migrowanie z biblioteki funkcji wykonawczej operacji zbiorczych do obsługi zbiorczej w zestawie SDK platformy .NET platformy .NET w wersji 3

DOTYCZY: NoSQL

W tym artykule opisano kroki wymagane do przeprowadzenia migracji kodu istniejącej aplikacji, który używa biblioteki funkcji wykonawczej zbiorczej platformy .NET do funkcji obsługi zbiorczej w najnowszej wersji zestawu .NET SDK.

Włączanie obsługi zbiorczej

Włącz obsługę zbiorczą w wystąpieniu CosmosClient za pomocą konfiguracji AllowBulkExecution :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Tworzenie zadań dla każdej operacji

Obsługa zbiorcza zestawu .NET SDK działa przy użyciu biblioteki równoległej zadań i operacji grupowania wykonywanych współbieżnie.

W zestawie SDK nie ma jednej metody, która będzie pobierać listę dokumentów lub operacji jako parametr wejściowy, ale zamiast tego należy utworzyć zadanie dla każdej operacji, którą chcesz wykonać zbiorczo, a następnie po prostu poczekać na ich ukończenie.

Jeśli na przykład początkowe dane wejściowe są listą elementów, w których każdy element ma następujący schemat:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Jeśli chcesz wykonać importowanie zbiorcze (podobnie jak w przypadku funkcji BulkExecutor.BulkImportAsync), musisz mieć współbieżne wywołania metody .CreateItemAsync Na przykład:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Jeśli chcesz przeprowadzić aktualizację zbiorczą (podobną do metody BulkExecutor.BulkUpdateAsync), należy mieć współbieżne wywołania metody ReplaceItemAsync po zaktualizowaniu wartości elementu. Na przykład:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

Jeśli chcesz usunąć zbiorczo (podobnie jak w przypadku funkcji BulkExecutor.BulkDeleteAsync), musisz mieć równoczesne wywołania elementu DeleteItemAsyncz kluczem id partycji i . Na przykład:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Przechwyć stan wyniku zadania

W poprzednich przykładach kodu utworzyliśmy współbieżną listę zadań i wywołaliśmy metodę CaptureOperationResponse dla każdego z tych zadań. Ta metoda jest rozszerzeniem, które pozwala nam zachować podobny schemat odpowiedzi jako BulkExecutor, przechwytując wszelkie błędy i śledząc użycie jednostek żądania.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

OperationResponse Gdzie element jest zadeklarowany jako:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Wykonywanie operacji współbieżnie

Aby śledzić zakres całej listy zadań, użyjemy tej klasy pomocniczej:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

Metoda ExecuteAsync będzie czekać na zakończenie wszystkich operacji i można jej użyć w następujący sposób:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Przechwytywanie statystyk

Poprzedni kod czeka na ukończenie wszystkich operacji i oblicza wymagane statystyki. Te statystyki są podobne do tych biblioteki funkcji wykonawczej zbiorczej BulkImportResponse.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

Zawiera BulkOperationResponse :

  1. Łączny czas potrzebny na przetworzenie listy operacji za pośrednictwem obsługi zbiorczej.
  2. Liczba pomyślnych operacji.
  3. Łączna liczba wykorzystanych jednostek żądania.
  4. Jeśli wystąpią błędy, zostanie wyświetlona lista krotki, które zawierają wyjątek i skojarzony element na potrzeby rejestrowania i identyfikacji.

Ponów próbę konfiguracji

Biblioteka funkcji wykonawczej zbiorczej zawierała wskazówki dotyczące ustawiania MaxRetryWaitTimeInSeconds poleceń i MaxRetryAttemptsOnThrottledRequests RetryOptions w celu 0 delegowania kontroli do biblioteki.

W przypadku zbiorczej obsługi zestawu .NET SDK nie ma ukrytego zachowania. Opcje ponawiania można skonfigurować bezpośrednio za pomocą poleceń CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests i CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Uwaga

W przypadkach, gdy aprowidowane jednostki żądania są znacznie niższe niż oczekiwano na podstawie ilości danych, warto rozważyć ustawienie tych jednostek na wysokie wartości. Operacja zbiorcza potrwa dłużej, ale ma większe szanse na całkowite powodzenie z powodu wyższych ponownych prób.

usprawnienia dotyczące wydajności

Podobnie jak w przypadku innych operacji z zestawem .NET SDK, korzystanie z interfejsów API strumienia zapewnia lepszą wydajność i pozwala uniknąć niepotrzebnej serializacji.

Korzystanie z interfejsów API strumienia jest możliwe tylko wtedy, gdy charakter używanych danych jest zgodny ze strumieniem bajtów (na przykład strumieniami plików). W takich przypadkach użycie CreateItemStreamAsyncmetod , ReplaceItemStreamAsynclub DeleteItemStreamAsync i praca z ResponseMessage (zamiast ItemResponse) zwiększa przepływność, którą można osiągnąć.

Następne kroki