Serverless Throttling?

MikeM-8643 20 Reputation points
2024-12-02T12:52:10.26+00:00

Hi. I have written a method using the guidance at https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/tutorial-dotnet-bulk-import. To shorten the code I have removed most logging. It looks something like this (questions follow):

public async Task<List<SomeObject>> UpdateAll(List<SomeObject> SomeObjectList,
    CancellationToken cancellationToken = default)
{
    int successfulRequests = 0;
    int failedRequests = 0;
    var concurrentTasks = new List<Task>();

    // Patterns for using Task.WhenAll
    // https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task.whenall?view=net-8.0
    // https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/tutorial-dotnet-bulk-import
    if (SomeObjectList != null && SomeObjectList.Count > 0)
    {
        foreach (var item in SomeObjectList)
        {
            var updatedSomeObject = _mapper.Map<SomeObjectDocument>(item);
            concurrentTasks.Add(_container
                .ReplaceItemAsync<SomeObjectDocument>(updatedSomeObject,
                updatedSomeObject.Id, new PartitionKey(updatedSomeObject.Id),
                options, cancellationToken)
                    .ContinueWith(itemResponse =>
                    {
                        if (!itemResponse.IsCompletedSuccessfully)
                        {
                            AggregateException innerExceptions = itemResponse.Exception.Flatten();
                            if (innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException)
                                is CosmosException cosmosException)
                            {
                                Interlocked.Increment(ref failedRequests);
                                _logger.LogError(cosmosException, "SomeObjectRepository:UpdateAll - CosmosException while updating" +
                                    " object with id: '{id}', error details: {error}", updatedSomeObject.Id, cosmosException.ToString());
                                Console.WriteLine($"SomeObjectRepository:UpdateAll - CosmosException while updating" +
                                    $" object with id: '{updatedSomeObject.Id}', error details: {cosmosException.ToString()}");
                            }
                            else
                            {
                                Interlocked.Increment(ref failedRequests);
                                _logger.LogError(innerExceptions, "SomeObjectRepository:UpdateAll - Unexpected exception while updating" +
                                    " object with id: '{id}', error details: {error}", updatedSomeObject.Id, innerExceptions.ToString());
                                Console.WriteLine($"SomeObjectRepository:UpdateAll - CosmosException while updating" +
                                    $" object with id: '{updatedSomeObject.Id}', error details: {innerExceptions.ToString()}");
                            }
                        }
                        else
                        {
                            Interlocked.Increment(ref successfulRequests);
                        }
                    }, cancellationToken, TaskContinuationOptions.None, TaskScheduler.Default));
        }

        Task completedTask = Task.WhenAll(concurrentTasks);

        try
        {
            await completedTask;
        }
        catch (Exception ex)
        {
            // All error messages are logged in the ContinueWith.
            // Log a warning here so we aren't eating the exception.
            loging...
        }

        if (completedTask.Status == TaskStatus.Faulted)
        {
            logging...
        }
    }

    return SomeObjectList;
}

Additional Notes:

  • Cosmos DB is serverless.
  • This class runs in an isolated process Azure Function using .NET8.
  • The code works when updating a smaller number of documents, but throws 429 exceptions on large updates. This is likely expected since Cosmos DB is serverless and supports 5,000 RU/sec.
  • I have tried throttling requests by using SemaphoreSlim, ConcurrentExclusiveSchedulerPair limited to one concurrent process, and others.

Questions:

  • _logger is private class property created by an injected ILoggerFactory. The logging statements in the ContinueWith do not appear in AppInsights. The Console.WriteLine statements do appear. Should the logger work in a ContinueWith?
  • Should I be using something other than TaskContinueOptions.None?
  • I could not find a Cosmos DB RU rate limiter, did I miss something?
  • The documentation states that 429 errors have automatic retry and will often complete. How do I trap what eventually completed and what did not?
  • Code like this is used in many repositories. Do I have to write custom code to determine the average RU/write depending on the repository, then send tasks in batches that will not exceed 5,000 RU/sec?

Thanks!

Mike

Azure Cosmos DB
Azure Cosmos DB
An Azure NoSQL database service for app development.
1,911 questions
Developer technologies | C#
{count} votes

Accepted answer
  1. Mahesh Kurva 5,210 Reputation points Microsoft External Staff Moderator
    2024-12-03T07:05:41.5466667+00:00

    Hi @MikeM-8643,

    Welcome to the Microsoft Q&A and thank you for posting your questions here.

    I'm glad to help you with your questions.

    _logger is private class property created by an injected ILoggerFactory. The logging statements in the ContinueWith do not appear in AppInsights. The Console.WriteLine statements do appear. Should the logger work in a ContinueWith?

    The logger should work in a ContinueWith block, but there are a few things to check:

    • Ensure the logger is properly initialized and accessible within the ContinueWith scope.
    • Verify that the logging level is set appropriately to capture the logs.
    • Check if there are any exceptions or issues within the ContinueWith block that might prevent logging.

    Should I be using something other than TaskContinueOptions.None?

    Using TaskContinueOptions.None is generally fine, but you might want to consider TaskContinuationOptions.ExecuteSynchronously if you need the continuation to run on the same thread.

    I could not find a Cosmos DB RU rate limiter, did I miss something?

    Cosmos DB does not have a built-in rate limiter, but you can implement your own rate limiting mechanism. This involves profiling your application, defining indexes, and spreading requests over time to avoid exceeding the provisioned throughput.

    The documentation states that 429 errors have automatic retry and will often complete. How do I trap what eventually completed and what did not?

    To trap what eventually completed and what did not, you can use the ResponseDiagnostics feature in the Cosmos DB SDK. This allows you to log detailed information about each request, including retries and final outcomes.

    You can also implement custom retry logic to track and log the status of each request.

    Code like this is used in many repositories. Do I have to write custom code to determine the average RU/write depending on the repository, then send tasks in batches that will not exceed 5,000 RU/sec?

    Yes, you might need to write custom code to determine the average RU per write operation. This involves profiling your application and monitoring the RU consumption for different operations.

    Sending tasks in batches that do not exceed 5,000 RU/sec can be managed by implementing a rate limiting mechanism.

    For more information, please refer the documents:

    https://learn.microsoft.com/en-us/azure/cosmos-db/rate-limiting-requests

    https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/troubleshoot-request-rate-too-large?source=recommendations&tabs=resource-specific

    Hope this helps. Do let us know if you any further queries.

    If this answers your query, do click Accept Answer and Yes for was this answer helpful. And, if you have any further query do let us know.

    1 person found this answer helpful.
    0 comments No comments

0 additional answers

Sort by: Most helpful

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.