Processändringsflöde i Azure Blob Storage

Ändringsflöde innehåller transaktionsloggar för alla ändringar som sker i blobarna och blobmetadata i ditt lagringskonto. Den här artikeln visar hur du läser ändringsflödesposter med hjälp av biblioteket för blobändringsflödesprocessor.

Mer information om ändringsflödet finns i Ändringsfeed i Azure Blob Storage.

Hämta biblioteket för blobändringsflödesprocessorn

  1. Öppna ett kommandofönster (till exempel: Windows PowerShell).
  2. Installera NuGet-paketet Azure.Storage.Blobs.Changefeed från projektkatalogen.
dotnet add package Azure.Storage.Blobs --version 12.5.1
dotnet add package Azure.Storage.Blobs.ChangeFeed --version 12.0.0-preview.4

Läsa poster

Anteckning

Ändringsflödet är en oföränderlig och skrivskyddad entitet i ditt lagringskonto. Valfritt antal program kan läsa och bearbeta ändringsflödet samtidigt och oberoende av varandra på egen hand. Poster tas inte bort från ändringsflödet när ett program läser dem. Läs- eller iterationstillståndet för varje förbrukande läsare är oberoende och underhålls endast av ditt program.

Det här exemplet itererar igenom alla poster i ändringsflödet, lägger till dem i en lista och returnerar sedan listan till anroparen.

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Get all the events in the change feed. 
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Det här exemplet skriver ut några värden från varje post i listan till konsolen.

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        string api = changeFeedEvent.EventData.Api;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Api: " + api);
    }
}

Återuppta läsning av poster från en sparad position

Du kan välja att spara läspositionen i ändringsflödet och sedan fortsätta iterera igenom posterna vid en framtida tidpunkt. Du kan spara läspositionen genom att hämta ändringsflödesmarkören. Markören är en sträng och ditt program kan spara strängen på alla sätt som passar programmets design (till exempel till en fil eller databas).

Det här exemplet itererar igenom alla poster i ändringsflödet, lägger till dem i en lista och sparar markören. Listan och markören returneras till anroparen.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync
    (string connectionString,  string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);             
    }

    // Update the change feed cursor.  The cursor is not required to get each page of events,
    // it is intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Dataströmbearbetning av poster

Du kan välja att bearbeta ändringsflödesposter när de checkas in i ändringsflödet. Se Specifikationer. Ändringshändelserna publiceras i ändringsflödet i genomsnitt i 60 sekunder. Vi rekommenderar att du söker efter nya ändringar med den här perioden i åtanke när du anger ditt avsökningsintervall.

Det här exemplet söker regelbundet efter ändringar. Om det finns ändringsposter bearbetar den här koden dessa poster och sparar ändringsflödesmarkören. På så sätt kan programmet, om processen stoppas och sedan startas igen, använda markören för att återuppta bearbetningsposterna där den senast slutade. I det här exemplet sparas markören i en lokal programkonfigurationsfil, men programmet kan spara den i alla former som passar bäst för ditt scenario.

public async Task ChangeFeedStreamAsync
    (string connectionString, int waitTimeMs, string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor).AsPages().GetAsyncEnumerator();

        while (true) 
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    string api = changeFeedEvent.EventData.Api;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Api: " + api);
                }

                // helper method to save cursor. 
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }

}

public void SaveCursor(string cursor)
{
    System.Configuration.Configuration config = 
        ConfigurationManager.OpenExeConfiguration
        (ConfigurationUserLevel.None);

    config.AppSettings.Settings.Clear();
    config.AppSettings.Settings.Add("Cursor", cursor);
    config.Save(ConfigurationSaveMode.Modified);
}

Läsa poster inom ett tidsintervall

Du kan läsa poster som ligger inom ett visst tidsintervall. Det här exemplet itererar igenom alla poster i ändringsflödet som infaller mellan 15:00 den 2 mars 2020 och 02:00 den 7 augusti 2020, lägger till dem i en lista och returnerar sedan listan till anroparen.

Välja segment för ett tidsintervall

public async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2020, 3, 2, 15, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2020, 8, 7, 2, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Starttiden som du anger avrundas ned till närmaste timme och sluttiden avrundas upp till närmaste timme. Det är möjligt att användarna kan se händelser som inträffat före starttiden och efter sluttiden. Det är också möjligt att vissa händelser som inträffar mellan start- och sluttiden inte visas. Det beror på att händelser kan registreras under timmen före starttiden eller under timmen efter sluttiden.

Nästa steg