Поделиться через


Обработка канала изменений в Хранилище BLOB-объектов Azure

Канал изменений предназначен для предоставления журналов транзакций всех изменений, происходящих в больших двоичных объектах и метаданных больших двоичных объектов в вашей учетной записи хранения. В этой статье описано, как считывать записи канала изменений с помощью библиотеки обработчика канала изменений больших двоичных объектов.

Дополнительные сведения см. в статье о канале изменений в Хранилище BLOB-объектов Azure.

Настройка проекта

В этом разделе описывается подготовка проекта для работы с клиентской библиотекой веб-канала изменений BLOB-объектов для .NET.

Установка пакетов

В каталоге проекта установите пакет для клиентской библиотеки веб-канала изменений служба хранилища Azure BLOB-объектов для .NET с помощью dotnet add package команды. В этом примере мы добавим --prerelease флаг в команду, чтобы установить последнюю предварительную версию.

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

Примеры кода в этой статье также используют пакеты Хранилище BLOB-объектов Azure и удостоверений Azure.

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

Добавьте директивы using.

Добавьте следующие using директивы в файл кода:

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

Создание клиентского объекта

Чтобы подключить приложение к хранилищу BLOB-объектов, создайте экземпляр BlobServiceClient класса. В следующем примере показано, как создать клиентский объект с помощью DefaultAzureCredential авторизации. Дополнительные сведения см. в статье "Авторизация доступа и подключение к хранилищу BLOB-объектов". Чтобы работать с каналом изменений, требуется встроенное средство чтения данных BLOB-объектов хранилища BLOB-объектов Azure RBAC или более поздней версии.

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

Клиентский объект передается в качестве параметра в некоторые методы, показанные в этой статье.

Чтение записей в канале изменений

Примечание.

Канал изменений является неизменяемой и доступной только для чтения сущностью в вашей учетной записи хранения. Любое количество приложений может одновременно и независимо друг от друга считывать и обрабатывать данные канала изменений. Записи не удаляются из канала изменений, когда приложение считывает их. Состояние чтения или итерации для каждого активного читателя является независимым и поддерживается только приложением.

Следующий пример кода выполняет итерацию всех записей в канале изменений, добавляет их в список, а затем возвращает список событий канала изменений:

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

В следующем примере кода отображаются некоторые значения из списка событий канала изменений:

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

Возобновление чтения записей из сохраненного расположения

Вы можете сохранить свое расположение чтения в канале изменений, а затем позже возобновить итерацию записей. Чтобы сохранить расположение чтения, используйте курсор канала изменений. Курсор — это строка , и приложение может сохранить такую строку в любом случае, который имеет смысл для разработки приложения, например в файл или базу данных.

В указанном ниже примере выполняется итерация всех записей в канале изменений, записи добавляются в список и сохраняется курсор. Список и курсор возвращаются вызывающему объекту.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Потоковая обработка записей

Вы можете обработать записи канала изменений по мере их фиксации на веб-канале изменений. Дополнительные сведения см. в разделе о спецификациях. События изменения публикуются в канале изменений в среднем с интервалом 60 секунд. Рекомендуется выполнять опрос на наличие изменений с учетом этого периода при указании интервала опроса.

В указанном ниже примере периодически выполняется опрос на наличие изменений. При наличии записей изменений данный код обрабатывает их и сохраняет курсор канала изменений. Таким образом, если процесс остановлен, а затем снова запущен, приложение может использовать курсор, в котором он остался последний раз, для возобновления обработки записей. В этом примере курсор сохраняется в локальном файле для демонстрационных целей, но приложение может сохранить его в любой форме, которая имеет наибольшее значение для вашего сценария.

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

Чтение записей в определенном диапазоне времени

Вы можете выполнять чтение записей в рамках определенного диапазона времени. В этом примере выполняется итерацию всех записей в канале изменений, которые попадают в определенный диапазон даты и времени, добавляет их в список и возвращает список:

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Указанное время начала округляется в меньшую сторону до ближайшего часа, а время окончания округляется в большую сторону до ближайшего часа. Пользователям могут отображаться события, произошедшие до времени начала и после времени окончания. Кроме того, возможно, что некоторые события, происходящие в период между временем начала и окончания, не отобразятся. Это связано с тем, что события могут записываться в течение часа, предшествующего времени начала, или в течение часа после времени окончания.

Следующие шаги