Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Zestawienie zmian zawiera dzienniki transakcji wszystkich zmian występujących w obiektach blob i metadanych obiektu blob na koncie magazynu. W tym artykule pokazano, jak odczytywać rekordy zestawienia zmian przy użyciu biblioteki procesora zestawienia zmian obiektów blob.
Aby dowiedzieć się więcej na temat zestawienia zmian, zobacz Zestawienie zmian w usłudze Azure Blob Storage.
konfigurowanie projektu
Ta sekcja przeprowadzi Cię przez proces przygotowywania projektu do pracy z biblioteką klienta zestawienia zmian obiektów blob dla platformy .NET.
Instalowanie pakietów
Z katalogu projektu zainstaluj pakiet dla biblioteki klienta zestawienia zmian obiektów blob usługi Azure Storage dla platformy .NET przy użyciu dotnet add package polecenia . W tym przykładzie dodamy flagę --prerelease do polecenia w celu zainstalowania najnowszej wersji zapoznawczej.
dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease
Przykłady kodu w tym artykule używają również pakietów usługi Azure Blob Storage i Azure Identity .
dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs
Dodawanie using dyrektyw
Dodaj następujące using dyrektywy do pliku kodu:
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;
Tworzenie obiektu klienta
Aby połączyć aplikację z usługą Blob Storage, utwórz wystąpienie BlobServiceClient klasy . W poniższym przykładzie pokazano, jak utworzyć obiekt klienta przy użyciu DefaultAzureCredential autoryzacji. Aby dowiedzieć się więcej, zobacz Autoryzowanie dostępu i nawiązywanie połączenia z usługą Blob Storage. Aby pracować z kanałem informacyjnym zmian, potrzebujesz wbudowanej roli Czytelnik danych obiektu blob usługi Azure Storage lub nowszego.
// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";
BlobServiceClient client = new(
new Uri($"https://{accountName}.blob.core.windows.net"),
new DefaultAzureCredential());
Obiekt klienta jest przekazywany jako parametr do niektórych metod przedstawionych w tym artykule.
Odczytywanie rekordów w kanale zmian
Uwaga
Źródło zmian to niezmienna i tylko do odczytu jednostka na koncie magazynu. Dowolna liczba aplikacji może odczytywać i przetwarzać zestawienie zmian jednocześnie i niezależnie w ich własnej wygodzie. Rekordy nie są usuwane z zestawienia zmian, gdy aplikacja je odczytuje. Stan odczytu lub iteracji każdego czytnika zużywającego jest niezależny i obsługiwany tylko przez aplikację.
Poniższy przykładowy kod iteruje wszystkie rekordy w kanale zmian, dodaje je do listy, a następnie zwraca listę zdarzeń zestawienia zmian:
public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
// Create a new BlobChangeFeedClient
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = [];
// Get all the events in the change feed
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
Poniższy przykład kodu wyświetla niektóre wartości z listy zdarzeń zestawienia zmian:
public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
}
Wznawianie odczytywania rekordów z zapisanej pozycji
Możesz zapisać pozycję odczytu w kanale informacyjnym zmian, a następnie wznowić iterację rekordów w przyszłości. Możesz zapisać pozycję odczytu, uzyskując kursor zestawienia zmian. Kursor jest ciągiem, a aplikacja może zapisać ten ciąg w dowolny sposób, który ma sens w projekcie aplikacji, na przykład w pliku lub bazie danych.
W tym przykładzie iteruje wszystkie rekordy w kanale zmian, dodaje je do listy i zapisuje kursor. Lista i kursor są zwracane do obiektu wywołującego.
public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
BlobServiceClient client,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor)
.AsPages(pageSizeHint: 10)
.GetAsyncEnumerator();
await enumerator.MoveNextAsync();
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}
// Update the change feed cursor. The cursor is not required to get each page of events,
// it's intended to be saved and used to resume iterating at a later date.
cursor = enumerator.Current.ContinuationToken;
return (cursor, changeFeedEvents);
}
Przetwarzanie strumieniowe rekordów
Możesz przetworzyć rekordy zestawienia zmian w miarę ich zatwierdzenia do zestawienia zmian. Zobacz Specyfikacje. Zdarzenia zmiany są publikowane w kanale zmian średnio przez 60 sekund. Zalecamy sondowanie nowych zmian w tym okresie podczas określania interwału sondowania.
Ten przykład okresowo sonduje pod kątem zmian. Jeśli rekordy zmiany istnieją, ten kod przetwarza te rekordy i zapisuje kursor zestawienia zmian. W ten sposób, jeśli proces zostanie zatrzymany, a następnie uruchomiony ponownie, aplikacja może użyć kursora do wznowienia przetwarzania rekordów, w których ostatnio została przerwana. W tym przykładzie kursor jest zapisywany w pliku lokalnym w celach demonstracyjnych, ale aplikacja może zapisać go w dowolnej formie, która jest najbardziej zrozumiała dla danego scenariusza.
public async Task ChangeFeedStreamAsync(
BlobServiceClient client,
int waitTimeMs,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
while (true)
{
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();
while (true)
{
var result = await enumerator.MoveNextAsync();
if (result)
{
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
// Helper method to save cursor
SaveCursor(enumerator.Current.ContinuationToken);
}
else
{
break;
}
}
await Task.Delay(waitTimeMs);
}
}
void SaveCursor(string cursor)
{
// Specify the path to the file where you want to save the cursor
string filePath = "path/to/cursor.txt";
// Write the cursor value to the file
File.WriteAllText(filePath, cursor);
}
Odczytywanie rekordów w określonym zakresie czasu
Możesz odczytywać rekordy należące do określonego zakresu czasu. W tym przykładzie iteruje wszystkie rekordy w kanale informacyjnym zmian należącym do określonego zakresu dat i godzin, dodaje je do listy i zwraca listę:
async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
// Create the start and end time. The change feed client will round start time down to
// the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);
// You can also provide just a start or end time.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
start: startTime,
end: endTime))
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
Czas rozpoczęcia, który podajesz, jest zaokrąglany w dół do najbliższej godziny, a czas zakończenia jest zaokrąglany do najbliższej godziny. Istnieje możliwość, że użytkownicy mogą zobaczyć zdarzenia, które wystąpiły przed godziną rozpoczęcia i po godzinie zakończenia. Istnieje również możliwość, że niektóre zdarzenia występujące między czasem rozpoczęcia i zakończenia nie będą wyświetlane. Dzieje się tak, ponieważ zdarzenia mogą być rejestrowane w ciągu godziny poprzedniej do godziny rozpoczęcia lub godziny po godzinie zakończenia.