Delen via


Wijzigingenfeed verwerken in Azure Blob Storage

Wijzigingenfeed biedt transactielogboeken van alle wijzigingen die optreden in de blobs en de blobmetagegevens in uw opslagaccount. In dit artikel leest u hoe u wijzigingenfeedrecords kunt lezen met behulp van de processorbibliotheek van de blob-wijzigingenfeed.

Zie Wijzigingenfeed in Azure Blob Storage voor meer informatie over de wijzigingenfeed.

De processorbibliotheek voor de blob-wijzigingenfeed ophalen

  1. Open een opdrachtvenster (bijvoorbeeld: Windows PowerShell).
  2. Installeer vanuit de projectmap het NuGet-pakket Azure.Storage.Blobs.Changefeed.
dotnet add package Azure.Storage.Blobs --version 12.5.1
dotnet add package Azure.Storage.Blobs.ChangeFeed --version 12.0.0-preview.4

Records lezen

Notitie

De wijzigingenfeed is een onveranderbare en alleen-lezen entiteit in uw opslagaccount. Een willekeurig aantal toepassingen kan de wijzigingenfeed tegelijkertijd en onafhankelijk lezen en verwerken op hun eigen gemak. Records worden niet verwijderd uit de wijzigingenfeed wanneer ze door een toepassing worden gelezen. De lees- of iteratiestatus van elke verbruikende lezer is onafhankelijk en wordt alleen onderhouden door uw toepassing.

In dit voorbeeld worden alle records in de wijzigingenfeed herhaald, toegevoegd aan een lijst en wordt die lijst vervolgens geretourneerd naar de aanroeper.

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Get all the events in the change feed. 
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

In dit voorbeeld worden enkele waarden uit elke record in de lijst naar de console afgedrukt.

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        string api = changeFeedEvent.EventData.Api;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Api: " + api);
    }
}

Het lezen van records hervatten vanaf een opgeslagen positie

U kunt ervoor kiezen om uw leespositie in de wijzigingenfeed op te slaan en vervolgens de records op een later tijdstip te doorlopen. U kunt de leespositie opslaan door de wijzigingenfeedcursor op te halen. De cursor is een tekenreeks en uw toepassing kan die tekenreeks opslaan op elke manier die zinvol is voor het ontwerp van uw toepassing (bijvoorbeeld in een bestand of database).

In dit voorbeeld worden alle records in de wijzigingenfeed herhaald, toegevoegd aan een lijst en wordt de cursor opgeslagen. De lijst en de cursor worden geretourneerd naar de aanroeper.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync
    (string connectionString,  string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);             
    }

    // Update the change feed cursor.  The cursor is not required to get each page of events,
    // it is intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Stroomverwerking van records

U kunt ervoor kiezen om wijzigingenfeedrecords te verwerken wanneer deze worden doorgevoerd in de wijzigingenfeed. Zie Specificaties. De wijzigingsevenementen worden met een periode van gemiddeld 60 seconden naar de wijzigingenfeed gepubliceerd. We raden u aan om bij het opgeven van uw poll-interval te peilen op nieuwe wijzigingen met deze periode in gedachten.

In dit voorbeeld wordt periodiek gepeild naar wijzigingen. Als er wijzigingsrecords bestaan, verwerkt deze code deze records en slaat deze wijzigingenfeedcursor op. Als het proces wordt gestopt en vervolgens opnieuw wordt gestart, kan de toepassing de cursor gebruiken om de verwerking van records te hervatten waar het voor het laatst is gebleven. In dit voorbeeld wordt de cursor opgeslagen in een configuratiebestand voor een lokale toepassing, maar uw toepassing kan de cursor opslaan in elke vorm die het meest geschikt is voor uw scenario.

public async Task ChangeFeedStreamAsync
    (string connectionString, int waitTimeMs, string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor).AsPages().GetAsyncEnumerator();

        while (true) 
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    string api = changeFeedEvent.EventData.Api;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Api: " + api);
                }

                // helper method to save cursor. 
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }

}

public void SaveCursor(string cursor)
{
    System.Configuration.Configuration config = 
        ConfigurationManager.OpenExeConfiguration
        (ConfigurationUserLevel.None);

    config.AppSettings.Settings.Clear();
    config.AppSettings.Settings.Add("Cursor", cursor);
    config.Save(ConfigurationSaveMode.Modified);
}

Records lezen binnen een tijdsbereik

U kunt records lezen die binnen een bepaald tijdsbereik vallen. In dit voorbeeld worden alle records in de wijzigingenfeed herhaald die tussen 15:00 uur op 2 maart 2020 en 2:00 uur op 7 augustus 2020 vallen, worden ze toegevoegd aan een lijst en wordt die lijst vervolgens geretourneerd naar de aanroeper.

Segmenten voor een tijdsbereik selecteren

public async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2020, 3, 2, 15, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2020, 8, 7, 2, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

De begintijd die u opgeeft, wordt naar beneden afgerond op het dichtstbijzijnde uur en de eindtijd wordt naar boven afgerond op het dichtstbijzijnde uur. Het is mogelijk dat gebruikers gebeurtenissen zien die zijn opgetreden vóór de begintijd en na de eindtijd. Het is ook mogelijk dat sommige gebeurtenissen tussen de begin- en eindtijd niet worden weergegeven. Dat komt omdat gebeurtenissen kunnen worden geregistreerd tijdens het uur voor de begintijd of tijdens het uur na de eindtijd.

Volgende stappen