Обработка канала изменений в Хранилище BLOB-объектов Azure
Канал изменений предназначен для предоставления журналов транзакций всех изменений, происходящих в больших двоичных объектах и метаданных больших двоичных объектов в вашей учетной записи хранения. В этой статье описано, как считывать записи канала изменений с помощью библиотеки обработчика канала изменений больших двоичных объектов.
Дополнительные сведения см. в статье о канале изменений в Хранилище BLOB-объектов Azure.
Настройка проекта
В этом разделе описывается подготовка проекта для работы с клиентской библиотекой веб-канала изменений BLOB-объектов для .NET.
Установка пакетов
В каталоге проекта установите пакет для клиентской библиотеки веб-канала изменений служба хранилища Azure BLOB-объектов для .NET с помощью dotnet add package
команды. В этом примере мы добавим --prerelease
флаг в команду, чтобы установить последнюю предварительную версию.
dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease
Примеры кода в этой статье также используют пакеты Хранилище BLOB-объектов Azure и удостоверений Azure.
dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs
Добавьте директивы using
.
Добавьте следующие using
директивы в файл кода:
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;
Создание клиентского объекта
Чтобы подключить приложение к хранилищу BLOB-объектов, создайте экземпляр BlobServiceClient
класса. В следующем примере показано, как создать клиентский объект с помощью DefaultAzureCredential
авторизации. Дополнительные сведения см. в статье "Авторизация доступа и подключение к хранилищу BLOB-объектов". Чтобы работать с каналом изменений, требуется встроенное средство чтения данных BLOB-объектов хранилища BLOB-объектов Azure RBAC или более поздней версии.
// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";
BlobServiceClient client = new(
new Uri($"https://{accountName}.blob.core.windows.net"),
new DefaultAzureCredential());
Клиентский объект передается в качестве параметра в некоторые методы, показанные в этой статье.
Чтение записей в канале изменений
Примечание.
Канал изменений является неизменяемой и доступной только для чтения сущностью в вашей учетной записи хранения. Любое количество приложений может одновременно и независимо друг от друга считывать и обрабатывать данные канала изменений. Записи не удаляются из канала изменений, когда приложение считывает их. Состояние чтения или итерации для каждого активного читателя является независимым и поддерживается только приложением.
Следующий пример кода выполняет итерацию всех записей в канале изменений, добавляет их в список, а затем возвращает список событий канала изменений:
public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
// Create a new BlobChangeFeedClient
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = [];
// Get all the events in the change feed
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
В следующем примере кода отображаются некоторые значения из списка событий канала изменений:
public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
}
Возобновление чтения записей из сохраненного расположения
Вы можете сохранить свое расположение чтения в канале изменений, а затем позже возобновить итерацию записей. Чтобы сохранить расположение чтения, используйте курсор канала изменений. Курсор — это строка , и приложение может сохранить такую строку в любом случае, который имеет смысл для разработки приложения, например в файл или базу данных.
В указанном ниже примере выполняется итерация всех записей в канале изменений, записи добавляются в список и сохраняется курсор. Список и курсор возвращаются вызывающему объекту.
public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
BlobServiceClient client,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor)
.AsPages(pageSizeHint: 10)
.GetAsyncEnumerator();
await enumerator.MoveNextAsync();
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}
// Update the change feed cursor. The cursor is not required to get each page of events,
// it's intended to be saved and used to resume iterating at a later date.
cursor = enumerator.Current.ContinuationToken;
return (cursor, changeFeedEvents);
}
Потоковая обработка записей
Вы можете обработать записи канала изменений по мере их фиксации на веб-канале изменений. Дополнительные сведения см. в разделе о спецификациях. События изменения публикуются в канале изменений в среднем с интервалом 60 секунд. Рекомендуется выполнять опрос на наличие изменений с учетом этого периода при указании интервала опроса.
В указанном ниже примере периодически выполняется опрос на наличие изменений. При наличии записей изменений данный код обрабатывает их и сохраняет курсор канала изменений. Таким образом, если процесс остановлен, а затем снова запущен, приложение может использовать курсор, в котором он остался последний раз, для возобновления обработки записей. В этом примере курсор сохраняется в локальном файле для демонстрационных целей, но приложение может сохранить его в любой форме, которая имеет наибольшее значение для вашего сценария.
public async Task ChangeFeedStreamAsync(
BlobServiceClient client,
int waitTimeMs,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
while (true)
{
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();
while (true)
{
var result = await enumerator.MoveNextAsync();
if (result)
{
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
// Helper method to save cursor
SaveCursor(enumerator.Current.ContinuationToken);
}
else
{
break;
}
}
await Task.Delay(waitTimeMs);
}
}
void SaveCursor(string cursor)
{
// Specify the path to the file where you want to save the cursor
string filePath = "path/to/cursor.txt";
// Write the cursor value to the file
File.WriteAllText(filePath, cursor);
}
Чтение записей в определенном диапазоне времени
Вы можете выполнять чтение записей в рамках определенного диапазона времени. В этом примере выполняется итерацию всех записей в канале изменений, которые попадают в определенный диапазон даты и времени, добавляет их в список и возвращает список:
async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
// Create the start and end time. The change feed client will round start time down to
// the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);
// You can also provide just a start or end time.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
start: startTime,
end: endTime))
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
Указанное время начала округляется в меньшую сторону до ближайшего часа, а время окончания округляется в большую сторону до ближайшего часа. Пользователям могут отображаться события, произошедшие до времени начала и после времени окончания. Кроме того, возможно, что некоторые события, происходящие в период между временем начала и окончания, не отобразятся. Это связано с тем, что события могут записываться в течение часа, предшествующего времени начала, или в течение часа после времени окончания.