Partager via


Traiter le flux de modification dans Stockage Blob Azure

Le flux de modification fournit des journaux des transactions de toutes les modifications apportées aux objets blob et aux métadonnées d’objets blob dans votre compte de stockage. Cet article explique comment lire les enregistrements de flux de modification à l’aide de la bibliothèque du processeur de flux de modification d’objet blob.

Pour en savoir plus sur le flux de modification, consultez Flux de modification dans Stockage Blob Azure.

Configuration de votre projet

Cette section vous guide tout au long de la préparation d’un projet visant à utiliser la bibliothèque cliente Flux de modification des blobs pour .NET.

Installer des packages

À partir de votre répertoire de projet, installez le package pour la bibliothèque cliente Flux de modification des blobs Stockage Azure pour .NET à l’aide de la commande dotnet add package. Dans cet exemple, nous ajoutons l’indicateur --prerelease à la commande pour installer la dernière préversion.

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

Les exemples de code de cet article utilisent également les packages Stockage Blob Azure et Azure Identity.

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

Ajoutez des directives using.

Ajoutez les directives using suivantes à votre fichier de code :

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

Créer un objet client

Pour connecter une application au service Stockage Blob, créez une instance de la classe BlobServiceClient. L’exemple suivant montre comment créer un objet client à l’aide de DefaultAzureCredential pour l’autorisation. Pour plus d’informations, consultez Autoriser l’accès et la connexion au Stockage Blob. Pour utiliser le flux de modification, vous avez besoin d’un rôle intégré RBAC Azure Lecteur de données de blobs de stockage ou version ultérieure.

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

L’objet client est passé en tant que paramètre à certaines des méthodes présentées dans cet article.

Lire les enregistrements dans le flux de modification

Remarque

Le flux de modification est une entité immuable et en lecture seule dans votre compte de stockage. Un nombre quelconque d’applications peut lire et traiter le flux de modification simultanément et de manière indépendante. Les enregistrements ne sont pas supprimés du flux de modification quand une application les lit. L’état de lecture ou d’itération de chaque lecteur de consommation est indépendant et géré uniquement par votre application.

Cet exemple de code procède à l'itération de tous les enregistrements du flux de modification, les ajoute à une liste, puis renvoie cette liste des événements de flux de modification :

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

L’exemple de code suivant affiche certaines valeurs de la liste des événements de flux de modification :

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

Reprendre la lecture des enregistrements à partir d'une position enregistrée

Vous pouvez choisir d'enregistrer votre position de lecture dans le flux de modification, puis de reprendre ultérieurement l'itération des enregistrements. Vous pouvez enregistrer la position de lecture en obtenant le curseur du flux de modification. Le curseur correspond à une chaîne et votre application peut enregistrer cette chaîne de la manière qui convient le mieux à la conception de votre application (par exemple : dans un fichier ou une base de données).

Cet exemple procède à l'itération de tous les enregistrements du flux de modification, les ajoute à une liste et enregistre le curseur. La liste et le curseur sont renvoyés à l'appelant.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Traitement des flux de données d’enregistrement

Vous pouvez choisir de traiter les enregistrements du flux de modification à mesure qu'ils sont validés dans le flux de modification. Voir Spécifications. Les événements de modification sont publiés dans le flux de modification à une fréquence de 60 secondes en moyenne. Nous vous recommandons d'interroger les nouvelles modifications en tenant compte de cette fréquence lorsque vous spécifiez votre intervalle d'interrogation.

Cet exemple interroge périodiquement les modifications. S'il existe des enregistrements de modification, ce code les traite et enregistre le curseur du flux de modification. Ainsi, si le processus est arrêté puis redémarré, l'application peut utiliser le curseur pour reprendre le traitement des enregistrements là où il s'était interrompu. Cet exemple enregistre le curseur dans un fichier local à des fins de démonstration, mais votre application peut l'enregistrer sous la forme qui convient le mieux à votre scénario.

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

Lire des enregistrements dans un intervalle de temps spécifique

Vous pouvez lire des enregistrements situés dans un intervalle de temps spécifique. Cet exemple procède à l'itération de tous les enregistrements du flux de modification situés dans une plage de dates et d’heures spécifique, les ajoute à une liste, puis renvoie cette liste :

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

L'heure de début que vous indiquez est arrondie à l'heure inférieure, et l'heure de fin est arrondie à l'heure supérieure. Les utilisateurs peuvent donc voir des événements survenus avant l'heure de début et après l'heure de fin. Il est également possible que certains événements survenus entre l'heure de début et l'heure de fin n'apparaissent pas. En effet, des événements peuvent être enregistrés pendant l'heure précédant l'heure de début ou pendant l'heure suivant l'heure de fin.

Étapes suivantes