Condividi tramite


Elaborare il feed di modifiche in Archiviazione BLOB di Azure

Il feed di modifiche fornisce i log delle transazioni di tutte le modifiche apportate ai BLOB e ai metadati dei BLOB nell'account di archiviazione. Questo articolo illustra come leggere i record del feed di modifiche usando la libreria del processore del feed di modifiche BLOB.

Per altre informazioni sul feed di modifiche, vedere Feed di modifiche in Archiviazione BLOB di Azure.

Impostare il progetto

Questa sezione illustra come preparare un progetto per l'uso con la libreria client del feed di modifiche BLOB per .NET.

Installare i pacchetti

Nella directory del progetto installare il pacchetto per la libreria client del feed di modifiche BLOB di Archiviazione di Azure per .NET usando il dotnet add package comando . In questo esempio viene aggiunto il --prerelease flag al comando per installare la versione di anteprima più recente.

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

Gli esempi di codice in questo articolo usano anche i pacchetti Archiviazione BLOB di Azure e Azure Identity.

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

Aggiungere le direttive using

Aggiungere le direttive seguenti using al file di codice:

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

Creare un oggetto client

Per connettere l'applicazione all'archiviazione BLOB, creare un'istanza della BlobServiceClient classe . Nell'esempio seguente viene illustrato come creare un oggetto client usando DefaultAzureCredential per l'autorizzazione. Per altre informazioni, vedere Autorizzare l'accesso e connettersi all'archiviazione BLOB. Per usare il feed di modifiche, è necessario disporre del ruolo predefinito Controllo degli accessi in base al ruolo di Archiviazione blob lettore o versione successiva.

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

L'oggetto client viene passato come parametro ad alcuni dei metodi illustrati in questo articolo.

Leggere i record nel feed di modifiche

Nota

Il feed di modifiche è un'entità non modificabile e di sola lettura nell'account di archiviazione. Un numero qualsiasi di applicazioni può leggere ed elaborare il feed di modifiche contemporaneamente e in modo indipendente, a seconda delle proprie esigenze. I record non vengono rimossi dal feed di modifiche quando vengono letti da un'applicazione. Lo stato di lettura o iterazione di ogni lettore consumer è indipendente e gestito solo dall'applicazione.

L'esempio di codice seguente scorre tutti i record nel feed di modifiche, li aggiunge a un elenco e quindi restituisce l'elenco degli eventi del feed di modifiche:

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Nell'esempio di codice seguente vengono stampati alcuni valori dall'elenco degli eventi del feed di modifiche:

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

Riprendere la lettura dei record da una posizione salvata

È possibile scegliere di salvare la posizione di lettura nel feed di modifiche e quindi riprendere l'iterazione dei record in un momento successivo. È possibile salvare la posizione di lettura ottenendo il cursore del feed di modifiche. Il cursore è una stringa e l'applicazione può salvare tale stringa in qualsiasi modo appropriato per la progettazione dell'applicazione, ad esempio in un file o in un database.

In questo esempio viene eseguita l'iterazione di tutti i record nel feed di modifiche, che vengono aggiunti a un elenco, e viene salvato il cursore. L'elenco e il cursore vengono restituiti al chiamante.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Elaborazione dei flussi di record

È possibile scegliere di elaborare i record del feed di modifiche durante il commit nel feed di modifiche. Vedere Specifiche. Gli eventi di modifica vengono pubblicati nel feed di modifiche in un periodo medio di 60 secondi. È consigliabile eseguire il polling delle nuove modifiche tenendo presente questo periodo quando si specifica l'intervallo di polling.

In questo esempio viene eseguito periodicamente il polling delle modifiche. Se esistono record di modifiche, questo codice elabora tali record e salva il cursore del feed di modifiche. In questo modo, se il processo viene arrestato e quindi riavviato, l'applicazione può usare il cursore per riprendere l'elaborazione dei record dal punto in cui era stata interrotta l'ultima volta. In questo esempio il cursore viene salvato in un file locale a scopo dimostrativo, ma l'applicazione può salvarla in qualsiasi formato più appropriato per lo scenario in uso.

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

Leggere i record all'interno di un intervallo di tempo specifico

È possibile leggere i record che rientrano in un intervallo di tempo specifico. Questo esempio scorre tutti i record nel feed di modifiche che rientrano in un intervallo di data e ora specifico, li aggiunge a un elenco e restituisce l'elenco:

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

L'ora di inizio specificata viene arrotondata per difetto all'ora più vicina e l'ora di fine viene arrotondata per eccesso all'ora più vicina. È possibile che gli utenti possano visualizzare eventi che si sono verificati prima dell'ora di inizio e dopo l'ora di fine. È anche possibile che alcuni eventi che si verificano tra l'ora di inizio e di fine non vengano visualizzati. Questo perché gli eventi potrebbero essere registrati durante l'ora precedente all'ora di inizio o durante l'ora successiva all'ora di fine.

Passaggi successivi