Поделиться через


Обработчик канала изменений в Azure Cosmos DB

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

Обработчик канала изменений входит в SDK Azure Cosmos DB .NET версии 3 и Java версии 4. Он упрощает процесс чтения канала изменений и эффективно распределяет обработку событий между несколькими потребителями.

Основное преимущество использования обработчика канала изменений — это его отказоустойчивый дизайн, который обеспечивает доставку всех событий в канале изменений по крайней мере один раз.

Поддерживаемые пакеты SDK

.Net V3 Java Node.JS Python

Компоненты обработчика канала изменений

Обработчик канала изменений имеет четыре основных компонента:

  • Отслеживаемый контейнер — это контейнер с данными, из которых формируется канал изменений. Все операции вставки и обновлений в отслеживаемом контейнере отражаются в канале изменений контейнера.

  • Контейнер аренды: контейнер аренды выступает в качестве хранилища состояний и координирует обработку канала изменений между несколькими рабочими рабочая роль. Контейнер аренды может храниться в той же учетной записи, что и отслеживаемый контейнер, или в отдельной учетной записи.

  • Вычислительный экземпляр. В вычислительном экземпляре размещается обработчик канала изменений для прослушивания изменений. В зависимости от платформы она может представляться виртуальной машиной, pod Kubernetes, экземпляром службы приложение Azure или фактическим физическим компьютером. В вычислительном экземпляре есть уникальный идентификатор, который называется именем экземпляра в этой статье.

  • Делегат — это код, в котором разработчик описывает требуемые действия для каждого пакета изменений, считанного обработчиком канала изменений.

Дополнительные сведения о том, как эти четыре элемента обработчика канала изменений работают вместе, давайте рассмотрим пример на следующей схеме. Отслеживаемый контейнер хранит элементы и использует "City" в качестве ключа секции. Значения ключа секции распределяются в диапазонах (каждый диапазон представляет физический раздел), содержащий элементы.

На схеме показаны два вычислительных экземпляра, а обработчик канала изменений назначает разные диапазоны каждому экземпляру для максимального распределения вычислений. Каждый экземпляр имеет другое уникальное имя.

Каждый диапазон считывается параллельно. Ход выполнения диапазона сохраняется отдельно от других диапазонов в контейнере аренды с помощью документа аренды . Сочетание аренды представляет текущее состояние обработчика канала изменений.

Пример обработчика канала изменений

Реализация обработчика канала изменений

Обработчик канала изменений в .NET доступен для последней версии , а также для всех версий и режима удаления. Все версии и удаленные режимы доступны в предварительной версии и поддерживаются для обработчика канала изменений, начиная с версии 3.40.0-preview.0. Точка входа для обоих режимов всегда является отслеживаемой контейнерой.

Чтобы прочитать последнюю версию в экземпляре Container , необходимо вызвать GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Чтобы прочитать все версии и удалить режим, вызов из GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes экземпляра Container :

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Для обоих режимов первый параметр — это отдельное имя, описывающее цель этого процессора. Второе имя — это реализация делегата, которая обрабатывает изменения.

Ниже приведен пример делегата для последней версии:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Ниже приведен пример делегата для всех версий и удаляет режим:

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

После этого вы определите имя вычислительного экземпляра или уникальный идентификатор с помощью WithInstanceName. Имя вычислительного экземпляра должно быть уникальным и разными для каждого развернутого вычислительного экземпляра. Контейнер задается для поддержания состояния аренды с помощью WithLeaseContainer.

Вызов Build предоставляет экземпляр процессора, который можно запустить с помощью вызова StartAsync.

Примечание.

Приведенные выше фрагменты кода взяты из примеров в GitHub. Вы можете получить пример для последней версии или всех версий и удалить режим.

Жизненный цикл обработки

Обычный жизненный цикл экземпляра узла:

  1. Чтение канала изменений.
  2. Если изменений нет, спящий режим для предопределенного времени (настраиваемого с помощью WithPollInterval построителя) и перейдите в #1.
  3. При наличии изменений отправьте их в делегат.
  4. Когда делегат успешно завершит обработку изменений, добавьте в хранилище аренды последнюю обработанную точку во времени и перейдите к № 1.

Обработка ошибок

Обработчик канала изменений устойчив к ошибкам пользовательского кода. Если реализация делегата имеет необработанное исключение (шаг 4), поток, обрабатывающий конкретный пакет изменений, останавливается, а новый поток в конечном итоге создается. Новый поток проверяет последнюю точку во времени, когда хранилище аренды сохранило для этого диапазона значений ключа секции. Новый поток перезапускается оттуда, эффективно отправляя тот же пакет изменений делегату. Это поведение продолжается до тех пор, пока делегат не обрабатывает изменения правильно, и это причина, по которой обработчик канала изменений имеет гарантию "по крайней мере один раз".

Примечание.

Только в одном сценарии пакет изменений не выполняется. Если сбой происходит при первом выполнении делегата, хранилище аренды не имеет предыдущего сохраненного состояния, которое будет использоваться для повторных попыток. В таких случаях повтор использует начальную начальную конфигурацию, которая может или не включать последний пакет.

Чтобы предотвратить непрерывное повторение одного и того же пакета изменений обработчиком канала изменений, необходимо добавить логику в код делегата для записи документов, за исключением исключения, в очередь сообщений с ошибками. Такая схема гарантирует, что вы сможете отследить необработанные изменения, сохранив возможность продолжать обработку будущих изменений. Очередь сообщений об ошибках может быть другим контейнером Azure Cosmos DB. Точное хранилище данных не имеет значения. Вы просто хотите сохранить необработанные изменения.

Вы также можете использовать оценщик канала изменений для отслеживания хода выполнения экземпляров обработчика канала изменений при чтении канала изменений или с помощью уведомлений жизненного цикла для обнаружения базовых сбоев.

Уведомления жизненного цикла

Обработчик канала изменений можно подключить к любому соответствующему событию в жизненном цикле. Вы можете получать уведомления одному или всем из них. Мы рекомендуем зарегистрировать по меньшей мере получение уведомлений об ошибках.

  • Зарегистрируйте обработчик для WithLeaseAcquireNotification, чтобы получать уведомления о том, что текущий узел зарегистрировал аренду и начал ее обрабатывать.
  • Зарегистрируйте обработчик для WithLeaseReleaseNotification, чтобы получать уведомления о том, что текущий узел освободил аренду и прекратил ее обрабатывать.
  • Зарегистрируйте обработчик, чтобы WithErrorNotification получать уведомления, когда текущий узел обнаруживает исключение во время обработки. Необходимо иметь возможность различать, является ли источник делегатом пользователя (необработанным исключением) или ошибкой, с которой обработчик сталкивается при попытке получить доступ к отслеживаемой контейнере (например, сетевым проблемам).

Уведомления жизненного цикла доступны в обоих режимах канала изменений. Ниже приведен пример уведомлений жизненного цикла в режиме последней версии:

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Единица развертывания

Одна единица развертывания обработчика канала изменений состоит из одного или нескольких вычислительных экземпляров, имеющих одинаковое значение processorName и одну конфигурацию контейнера аренды, но разные имена экземпляров. Вы можете иметь множество единиц развертывания, в которых каждый блок имеет другой бизнес-поток для изменений, и каждая единица развертывания состоит из одного или нескольких экземпляров.

Например, у вас может быть одна единица развертывания, которая активирует внешний API при каждом изменении контейнера. Другая единица развертывания может перемещать данные в режиме реального времени при каждом изменении. Когда изменение происходит в отслеживаемом контейнере, все единицы развертывания уведомляются.

Динамическое масштабирование

Как упоминалось ранее, в единице развертывания можно использовать один или несколько вычислительных экземпляров. Чтобы воспользоваться преимуществами распределения вычислений в единице развертывания, единственными ключевыми требованиями являются следующие:

  • Все экземпляры должны иметь одинаковую конфигурацию контейнеров аренды.
  • Для всех экземпляров должно быть одинаковое значение processorName.
  • Каждый экземпляр должен иметь другое имя (WithInstanceName).

Если применяются эти три условия, обработчик канала изменений распределяет все аренды, которые находятся в контейнере аренды во всех запущенных экземплярах этой единицы развертывания, и он параллелизирует вычисления с помощью алгоритма равного распределения. Аренда принадлежит одному экземпляру в любое время, поэтому количество экземпляров не должно превышать количество аренд.

Число экземпляров может увеличиваться и уменьшаться. Обработчик канала изменений динамически настраивает нагрузку, распространяя ее соответствующим образом.

Кроме того, обработчик канала изменений может динамически настраивать масштаб контейнера, если пропускная способность контейнера или хранилище увеличивается. Когда контейнер растет, обработчик канала изменений прозрачно обрабатывает сценарий путем динамического увеличения аренды и распределения новых аренд между существующими экземплярами.

Время запуска

По умолчанию при первом запуске обработчика канала изменений он инициализирует контейнер аренды и запускает жизненный цикл обработки. Любые изменения, которые произошли в отслеживаемом контейнере перед инициализацией обработчика канала изменений в первый раз, не обнаружены.

Чтение с предыдущей даты и времени

Обработчик канала изменений можно инициализировать для чтения изменений, начиная с определенной даты и времени , передав экземпляр DateTime WithStartTime расширения построителя:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

Обработчик канала изменений инициализируется для указанной даты и времени, и начинает считывать изменения, которые произошли после этого.

Чтение с начала

В других сценариях, например при миграции данных или при анализе всей истории контейнера, необходимо считывать канал изменений с начала существования этого контейнера. Вы можете использовать WithStartTime расширение построителя, но передать DateTime.MinValue.ToUniversalTime(), что создает представление в формате UTC минимального DateTime значения, как в следующем примере:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Обработчик канала изменений инициализирован и начинает считывать изменения с начала времени существования контейнера.

Примечание.

Эти параметры настройки работают только для настройки начальной точки во время обработчика канала изменений. После инициализации контейнера аренды в первый раз изменение этих параметров не влияет.

Настройка начальной точки доступна только для режима канала изменений последней версии. При использовании всех версий и режима удаления необходимо начать чтение с момента запуска процессора или возобновить работу с предыдущего состояния аренды, которое находится в течение непрерывного периода хранения резервных копий вашей учетной записи.

Канал изменений и подготовленная пропускная способность

Операции чтения канала изменений в отслеживаемом контейнере используют единицы запросов. Убедитесь, что отслеживаемый контейнер не регулируется. Регулирование добавляет задержки в получении событий канала изменений на процессорах.

Операции с контейнером аренды (обновление и поддержание состояния) потребляют единицы запроса. Чем выше число экземпляров, использующих один и тот же контейнер аренды, тем выше потенциальное потребление единиц запросов. Убедитесь, что контейнер аренды не испытывает регулирования. Регулирование добавляет задержки при получении событий канала изменений. Регулирование может даже полностью завершить обработку.

Предоставление общего доступа к контейнеру аренды

Контейнер аренды можно совместно использовать в нескольких единицах развертывания. В контейнере общей аренды каждая единица развертывания прослушивает другой отслеживаемый контейнер или имеет другое значение.processorName В этой конфигурации каждая единица развертывания поддерживает независимое состояние контейнера аренды. Проверьте потребление единиц запроса в контейнере аренды, чтобы убедиться, что подготовленная пропускная способность достаточно для всех единиц развертывания.

Расширенная конфигурация аренды

Три ключевых конфигурации могут повлиять на работу обработчика канала изменений. Каждая конфигурация влияет на потребление единиц запроса в контейнере аренды. Вы можете задать одну из этих конфигураций при создании обработчика канала изменений, но используйте их тщательно.

  • Приобретение аренды: по умолчанию каждые 17 секунд. Узел периодически проверяет состояние арендного магазина и рассмотрите возможность получения аренды в рамках динамического процесса масштабирования . Этот процесс выполняется путем выполнения запроса к контейнеру аренды. Сокращение этого значения упрощает перебалансирование и получение аренды, но увеличивает потребление единиц запросов в контейнере аренды.
  • Срок действия аренды: по умолчанию 60 секунд. Определяет максимальный период времени, когда аренда может существовать без каких-либо действий продления до получения другим узлом. Когда узел завершает работу, аренды, принадлежащие ему, собираются другими узлами после этого периода времени плюс настроенный интервал продления. Уменьшение этого значения делает восстановление после сбоя узла быстрее, но срок действия никогда не должен быть ниже интервала продления.
  • Продление аренды: по умолчанию каждые 13 секунд. Узел, принадлежащий аренде, периодически продлевает аренду, даже если новые изменения для использования отсутствуют. Этот процесс выполняется путем выполнения замены аренды. Уменьшение этого значения снижает время, необходимое для обнаружения аренды, потерянного сбоем узла, но увеличивает потребление единиц запросов в контейнере аренды.

Место размещения обработчика канала изменений

Обработчик канала изменений можно разместить на любой платформе, поддерживающей длительные процессы или задачи. Далее приводятся некоторые примеры.

Хотя обработчик канала изменений может работать в коротких средах, так как контейнер аренды сохраняет состояние, цикл запуска этих сред добавляет задержки в время получения уведомлений (из-за затрат на запуск процессора при каждом запуске среды).

Требования к доступу на основе ролей

При использовании идентификатора Microsoft Entra в качестве механизма проверки подлинности убедитесь, что удостоверение имеет соответствующие разрешения:

  • В отслеживаемом контейнере:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • В контейнере аренды:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

Дополнительные ресурсы

Следующие шаги

Дополнительные сведения о обработчике канала изменений см. в следующих статьях: