Partilhar via


Feed de alteração de processo no Armazenamento de Blobs do Azure

O feed de alterações fornece logs de transações de todas as alterações que ocorrem nos blobs e nos metadados de blob em sua conta de armazenamento. Este artigo mostra como ler registros de feed de alteração usando a biblioteca do processador de feed de alteração de blob.

Para saber mais sobre o feed de alterações, consulte Alterar feed no Armazenamento de Blobs do Azure.

Configure o seu projeto

Esta seção orienta você na preparação de um projeto para trabalhar com a biblioteca de cliente do Feed de Alteração de Blobs para .NET.

Instalar pacotes

No diretório do projeto, instale o pacote para a biblioteca de cliente do Feed de Alteração de Blobs de Armazenamento do Azure para .NET usando o dotnet add package comando. Neste exemplo, adicionamos o sinalizador --prerelease ao comando para instalar a versão de visualização mais recente.

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

Os exemplos de código neste artigo também usam os pacotes Azure Blob Storage e Azure Identity .

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

Adicionar using diretivas

Adicione as seguintes using diretivas ao seu arquivo de código:

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

Criar um objeto cliente

Para conectar o aplicativo ao Armazenamento de Blob, crie uma instância da BlobServiceClient classe. O exemplo a seguir mostra como criar um objeto cliente usando DefaultAzureCredential para autorização. Para saber mais, consulte Autorizar acesso e conectar-se ao Armazenamento de Blobs. Para trabalhar com o feed de alterações, você precisa do Leitor de Dados de Blob de Armazenamento do Azure RBAC ou superior.

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

O objeto cliente é passado como um parâmetro para alguns dos métodos mostrados neste artigo.

Ler registos no feed de alterações

Nota

O feed de alterações é uma entidade imutável e somente leitura em sua conta de armazenamento. Qualquer número de aplicativos pode ler e processar o feed de alterações simultaneamente e de forma independente, de acordo com sua própria conveniência. Os registros não são removidos do feed de alterações quando um aplicativo os lê. O estado de leitura ou iteração de cada leitor consumidor é independente e mantido apenas pelo seu aplicativo.

O exemplo de código a seguir itera por todos os registros no feed de alterações, adiciona-os a uma lista e retorna a lista de eventos do feed de alterações:

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

O exemplo de código a seguir imprime alguns valores da lista de eventos de feed de alteração:

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

Retomar a leitura de registos a partir de uma posição guardada

Você pode optar por salvar sua posição de leitura no feed de alterações e, em seguida, retomar a iteração pelos registros em um momento futuro. Você pode salvar a posição de leitura obtendo o cursor do feed de alterações. O cursor é uma cadeia de caracteres e seu aplicativo pode salvá-la de qualquer maneira que faça sentido para o design do seu aplicativo, por exemplo, em um arquivo ou banco de dados.

Este exemplo itera todos os registros no feed de alterações, adiciona-os a uma lista e salva o cursor. A lista e o cursor são retornados ao chamador.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Processamento de fluxo de registros

Você pode optar por processar os registros de feed de alteração à medida que eles estão comprometidos com o feed de alterações. Consulte as especificações. Os eventos de alteração são publicados no feed de alterações em um período de 60 segundos, em média. Recomendamos que você pesquise novas alterações com esse período em mente ao especificar seu intervalo de pesquisa.

Este exemplo sonda periodicamente as alterações. Se existirem registos de alteração, este código processa esses registos e guarda o cursor do feed de alterações. Dessa forma, se o processo for interrompido e, em seguida, iniciado novamente, o aplicativo pode usar o cursor para retomar o processamento de registros onde parou pela última vez. Este exemplo salva o cursor em um arquivo local para fins de demonstração, mas seu aplicativo pode salvá-lo de qualquer forma que faça mais sentido para seu cenário.

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

Ler registros dentro de um intervalo de tempo específico

Você pode ler registros que se enquadram em um intervalo de tempo específico. Este exemplo itera todos os registros no feed de alterações que se enquadram em um intervalo de data e hora específico, adiciona-os a uma lista e retorna a lista:

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

A hora de início fornecida é arredondada para a hora mais próxima e a hora de fim é arredondada para a hora mais próxima. É possível que os utilizadores vejam eventos que ocorreram antes da hora de início e depois da hora de fim. Também é possível que alguns eventos que ocorrem entre a hora de início e a hora de término não apareçam. Isso porque os eventos podem ser registrados durante a hora anterior à hora de início ou durante a hora após a hora de término.

Próximos passos