Ескертпе
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Жүйеге кіруді немесе каталогтарды өзгертуді байқап көруге болады.
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Каталогтарды өзгертуді байқап көруге болады.
Канал изменений предназначен для предоставления журналов транзакций всех изменений, происходящих в больших двоичных объектах и метаданных больших двоичных объектов в вашей учетной записи хранения. В этой статье описано, как считывать записи канала изменений с помощью библиотеки обработчика канала изменений больших двоичных объектов.
Дополнительные сведения см. в статье о канале изменений в Хранилище BLOB-объектов Azure.
Настройка проекта
В этом разделе описывается подготовка проекта для работы с клиентской библиотекой веб-канала изменений BLOB-объектов для .NET.
Установка пакетов
В каталоге проекта установите пакет для клиентской библиотеки веб-канала изменений служба хранилища Azure BLOB-объектов для .NET с помощью dotnet add package команды. В этом примере мы добавим --prerelease флаг в команду, чтобы установить последнюю предварительную версию.
dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease
Примеры кода в этой статье также используют пакеты Хранилище BLOB-объектов Azure и удостоверений Azure.
dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs
Добавьте директивы using.
Добавьте следующие using директивы в файл кода:
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;
Создание клиентского объекта
Чтобы подключить приложение к хранилищу BLOB-объектов, создайте экземпляр BlobServiceClient класса. В следующем примере показано, как создать клиентский объект с помощью DefaultAzureCredential авторизации. Дополнительные сведения см. в статье "Авторизация доступа и подключение к хранилищу BLOB-объектов". Чтобы работать с каналом изменений, требуется встроенное средство чтения данных BLOB-объектов хранилища BLOB-объектов Azure RBAC или более поздней версии.
// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";
BlobServiceClient client = new(
new Uri($"https://{accountName}.blob.core.windows.net"),
new DefaultAzureCredential());
Клиентский объект передается в качестве параметра в некоторые методы, показанные в этой статье.
Чтение записей в канале изменений
Примечание.
Канал изменений является неизменяемой и доступной только для чтения сущностью в вашей учетной записи хранения. Любое количество приложений может одновременно и независимо друг от друга считывать и обрабатывать данные канала изменений. Записи не удаляются из канала изменений, когда приложение считывает их. Состояние чтения или итерации для каждого активного читателя является независимым и поддерживается только приложением.
Следующий пример кода выполняет итерацию всех записей в канале изменений, добавляет их в список, а затем возвращает список событий канала изменений:
public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
// Create a new BlobChangeFeedClient
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = [];
// Get all the events in the change feed
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
В следующем примере кода отображаются некоторые значения из списка событий канала изменений:
public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
}
Возобновление чтения записей из сохраненного расположения
Вы можете сохранить свое расположение чтения в канале изменений, а затем позже возобновить итерацию записей. Чтобы сохранить расположение чтения, используйте курсор канала изменений. Курсор — это строка , и приложение может сохранить такую строку в любом случае, который имеет смысл для разработки приложения, например в файл или базу данных.
В указанном ниже примере выполняется итерация всех записей в канале изменений, записи добавляются в список и сохраняется курсор. Список и курсор возвращаются вызывающему объекту.
public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
BlobServiceClient client,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor)
.AsPages(pageSizeHint: 10)
.GetAsyncEnumerator();
await enumerator.MoveNextAsync();
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}
// Update the change feed cursor. The cursor is not required to get each page of events,
// it's intended to be saved and used to resume iterating at a later date.
cursor = enumerator.Current.ContinuationToken;
return (cursor, changeFeedEvents);
}
Потоковая обработка записей
Вы можете обработать записи канала изменений по мере их фиксации на веб-канале изменений. Дополнительные сведения см. в разделе о спецификациях. События изменения публикуются в канале изменений в среднем с интервалом 60 секунд. Рекомендуется выполнять опрос на наличие изменений с учетом этого периода при указании интервала опроса.
В указанном ниже примере периодически выполняется опрос на наличие изменений. При наличии записей изменений данный код обрабатывает их и сохраняет курсор канала изменений. Таким образом, если процесс остановлен, а затем снова запущен, приложение может использовать курсор, в котором он остался последний раз, для возобновления обработки записей. В этом примере курсор сохраняется в локальном файле для демонстрационных целей, но приложение может сохранить его в любой форме, которая имеет наибольшее значение для вашего сценария.
public async Task ChangeFeedStreamAsync(
BlobServiceClient client,
int waitTimeMs,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
while (true)
{
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();
while (true)
{
var result = await enumerator.MoveNextAsync();
if (result)
{
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
// Helper method to save cursor
SaveCursor(enumerator.Current.ContinuationToken);
}
else
{
break;
}
}
await Task.Delay(waitTimeMs);
}
}
void SaveCursor(string cursor)
{
// Specify the path to the file where you want to save the cursor
string filePath = "path/to/cursor.txt";
// Write the cursor value to the file
File.WriteAllText(filePath, cursor);
}
Чтение записей в определенном диапазоне времени
Вы можете выполнять чтение записей в рамках определенного диапазона времени. В этом примере выполняется итерацию всех записей в канале изменений, которые попадают в определенный диапазон даты и времени, добавляет их в список и возвращает список:
async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
// Create the start and end time. The change feed client will round start time down to
// the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);
// You can also provide just a start or end time.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
start: startTime,
end: endTime))
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
Указанное время начала округляется в меньшую сторону до ближайшего часа, а время окончания округляется в большую сторону до ближайшего часа. Пользователям могут отображаться события, произошедшие до времени начала и после времени окончания. Кроме того, возможно, что некоторые события, происходящие в период между временем начала и окончания, не отобразятся. Это связано с тем, что события могут записываться в течение часа, предшествующего времени начала, или в течение часа после времени окончания.