Kanał informacyjny zmian procesów w Azure Blob Storage
Zestawienie zmian udostępnia dzienniki transakcji wszystkich zmian występujących w obiektach blob i metadanych obiektu blob na koncie magazynu. W tym artykule pokazano, jak odczytywać rekordy zestawienia zmian przy użyciu biblioteki procesora zestawienia zmian obiektów blob.
Aby dowiedzieć się więcej na temat zestawienia zmian, zobacz Zestawienie zmian w Azure Blob Storage.
Pobieranie biblioteki procesora zestawienia zmian obiektów blob
- Otwórz okno polecenia (na przykład: Windows PowerShell).
- Z katalogu projektu zainstaluj pakiet NuGet Azure.Storage.Blobs.Changefeed.
dotnet add package Azure.Storage.Blobs --version 12.5.1
dotnet add package Azure.Storage.Blobs.ChangeFeed --version 12.0.0-preview.4
Odczyt rekordów
Uwaga
Zestawienie zmian to niezmienna i tylko do odczytu jednostka na koncie magazynu. Dowolna liczba aplikacji może odczytywać i przetwarzać zestawienie zmian jednocześnie i niezależnie w ich własnej wygodzie. Rekordy nie są usuwane z zestawienia zmian, gdy aplikacja je odczytuje. Stan odczytu lub iteracji każdego czytnika zużywającego jest niezależny i obsługiwany tylko przez aplikację.
W tym przykładzie iteruje wszystkie rekordy w kanale informacyjnym zmian, dodaje je do listy, a następnie zwraca tę listę do elementu wywołującego.
public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(string connectionString)
{
// Get a new blob service client.
BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);
// Get a new change feed client.
BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
// Get all the events in the change feed.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
Ten przykład drukuje do konsoli kilka wartości z każdego rekordu na liście.
public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
string api = changeFeedEvent.EventData.Api;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Api: " + api);
}
}
Wznawianie odczytywania rekordów z zapisanej pozycji
Możesz zapisać pozycję odczytu w kanale informacyjnym zmian, a następnie wznowić iterację za pomocą rekordów w przyszłości. Pozycję odczytu można zapisać, uzyskując kursor zestawienia zmian. Kursor jest ciągiem , a aplikacja może zapisać ten ciąg w dowolny sposób, który ma sens dla projektu aplikacji (na przykład do pliku lub bazy danych).
W tym przykładzie iteruje wszystkie rekordy w kanale informacyjnym zmian, dodaje je do listy i zapisuje kursor. Lista i kursor są zwracane do obiektu wywołującego.
public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync
(string connectionString, string cursor)
{
// Get a new blob service client.
BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);
// Get a new change feed client.
BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuation: cursor)
.AsPages(pageSizeHint: 10)
.GetAsyncEnumerator();
await enumerator.MoveNextAsync();
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}
// Update the change feed cursor. The cursor is not required to get each page of events,
// it is intended to be saved and used to resume iterating at a later date.
cursor = enumerator.Current.ContinuationToken;
return (cursor, changeFeedEvents);
}
Przetwarzanie strumieniowe rekordów
Możesz przetworzyć rekordy zestawienia zmian, ponieważ są one zatwierdzane w kanale zmian. Zobacz Specyfikacje. Zdarzenia zmian są publikowane w kanale informacyjnym zmian w średnim okresie wynoszącym 60 sekund. Zalecamy sondowanie nowych zmian z tym okresem podczas określania interwału sondowania.
Ten przykład okresowo sonduje zmiany. Jeśli rekordy zmiany istnieją, ten kod przetwarza te rekordy i zapisuje kursor zestawienia zmian. W ten sposób, jeśli proces zostanie zatrzymany, a następnie uruchomiony ponownie, aplikacja może użyć kursora, aby wznowić przetwarzanie rekordów, w których ostatnio zostało wyłączone. W tym przykładzie kursor jest zapisywany w lokalnym pliku konfiguracji aplikacji, ale aplikacja może zapisać go w dowolnej formie, która ma największe znaczenie dla danego scenariusza.
public async Task ChangeFeedStreamAsync
(string connectionString, int waitTimeMs, string cursor)
{
// Get a new blob service client.
BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);
// Get a new change feed client.
BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
while (true)
{
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuation: cursor).AsPages().GetAsyncEnumerator();
while (true)
{
var result = await enumerator.MoveNextAsync();
if (result)
{
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
string api = changeFeedEvent.EventData.Api;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Api: " + api);
}
// helper method to save cursor.
SaveCursor(enumerator.Current.ContinuationToken);
}
else
{
break;
}
}
await Task.Delay(waitTimeMs);
}
}
public void SaveCursor(string cursor)
{
System.Configuration.Configuration config =
ConfigurationManager.OpenExeConfiguration
(ConfigurationUserLevel.None);
config.AppSettings.Settings.Clear();
config.AppSettings.Settings.Add("Cursor", cursor);
config.Save(ConfigurationSaveMode.Modified);
}
Odczytywanie rekordów w zakresie czasu
Możesz odczytywać rekordy, które należą do określonego zakresu czasu. W tym przykładzie iteruje wszystkie rekordy w kanale informacyjnym zmian, które należą do 15:00 w dniu 2 marca 2020 r. i 2:00 w dniu 7 sierpnia 2020 r., dodaje je do listy, a następnie zwraca tę listę do osoby wywołującej.
Wybieranie segmentów dla zakresu czasu
public async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(string connectionString)
{
// Get a new blob service client.
BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);
// Get a new change feed client.
BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
// Create the start and end time. The change feed client will round start time down to
// the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = new DateTimeOffset(2020, 3, 2, 15, 0, 0, TimeSpan.Zero);
DateTimeOffset endTime = new DateTimeOffset(2020, 8, 7, 2, 0, 0, TimeSpan.Zero);
// You can also provide just a start or end time.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
start: startTime,
end: endTime))
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
Czas rozpoczęcia, który podajesz, jest zaokrąglany w dół do najbliższej godziny, a czas zakończenia jest zaokrąglany do najbliższej godziny. Istnieje możliwość, że użytkownicy mogą zobaczyć zdarzenia, które wystąpiły przed godziną rozpoczęcia i po godzinie zakończenia. Istnieje również możliwość, że niektóre zdarzenia występujące między czasem rozpoczęcia i zakończenia nie będą wyświetlane. Dzieje się tak, ponieważ zdarzenia mogą być rejestrowane w ciągu godziny poprzedniej do godziny rozpoczęcia lub w ciągu godziny po godzinie zakończenia.