Processar o feed de alterações no Armazenamento de Blobs do Azure

O feed de alterações fornece os logs de transações de todas as alterações que ocorrem nos blobs e nos metadados de blob na conta de armazenamento. Este artigo mostra como ler os registros do feed de alterações usando a biblioteca do processador do feed de alterações de blob.

Para saber mais sobre o feed de alterações, confira o Feed de alterações no Armazenamento de Blobs do Azure.

Obter a biblioteca do processador de feed de alterações de blob

  1. Abra uma janela de comando (por exemplo: Windows PowerShell).
  2. No diretório do projeto, instale o pacote NuGet Azure.Storage.Blobs.Changefeed.
dotnet add package Azure.Storage.Blobs --version 12.5.1
dotnet add package Azure.Storage.Blobs.ChangeFeed --version 12.0.0-preview.4

Ler registros

Observação

O feed de alterações é uma entidade imutável e somente leitura na conta de armazenamento. Qualquer quantidade de aplicativos pode ler e processar o feed de alterações simultaneamente e de maneira independente, como preferir. Os registros não são removidos do feed de alterações quando lidos por um aplicativo. O estado de leitura ou de iteração de cada leitor de consumo é independente e mantido somente pelo próprio aplicativo.

Este exemplo itera por todos os registros no feed de alterações, adiciona-os a uma lista e retorna essa lista ao chamador.

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Get all the events in the change feed. 
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Este exemplo imprime no console alguns valores de cada registro da lista.

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        string api = changeFeedEvent.EventData.Api;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Api: " + api);
    }
}

Continuar a leitura de registros de uma posição salva

Você pode optar por salvar sua posição de leitura no feed de alterações e retomar a iteração pelos registros em um momento futuro. Você pode salvar a posição de leitura obtendo o cursor do feed de alterações. O cursor é uma cadeia de caracteres e o aplicativo pode salvar essa cadeia de qualquer maneira que faça sentido para o design do aplicativo (por exemplo, para um arquivo ou um banco de dados).

Este exemplo itera por todos os registros no feed de alterações, os adiciona uma lista e salva o cursor. A lista e o cursor são retornados para o chamador.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync
    (string connectionString,  string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);             
    }

    // Update the change feed cursor.  The cursor is not required to get each page of events,
    // it is intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Transmitir o processamento de registros

Você pode optar por processar os registros do feed de alterações à medida em que eles são confirmados no feed de alterações. Confira as Especificações. Os eventos de alteração são publicados no feed de alterações em um período de 60 segundos em média. É recomendável que você pesquise novas alterações com esse período em mente ao especificar o intervalo de pesquisa.

Este exemplo pesquisa as alterações periodicamente. Se houver registros de alteração, esse código processa os registros e salva o cursor do feed de alterações. Dessa forma, se o processo for interrompido e iniciado novamente, o aplicativo poderá usar o cursor para retomar os registros de processamento onde ele parou pela última vez. Este exemplo salva o cursor em um arquivo de configuração de aplicativo local, mas o aplicativo pode salvá-lo em qualquer formato que faça sentido para o cenário.

public async Task ChangeFeedStreamAsync
    (string connectionString, int waitTimeMs, string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor).AsPages().GetAsyncEnumerator();

        while (true) 
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    string api = changeFeedEvent.EventData.Api;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Api: " + api);
                }

                // helper method to save cursor. 
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }

}

public void SaveCursor(string cursor)
{
    System.Configuration.Configuration config = 
        ConfigurationManager.OpenExeConfiguration
        (ConfigurationUserLevel.None);

    config.AppSettings.Settings.Clear();
    config.AppSettings.Settings.Add("Cursor", cursor);
    config.Save(ConfigurationSaveMode.Modified);
}

Ler registros dentro de um intervalo de tempo

Você pode ler os registros que se enquadram em um intervalo de tempo específico. Este exemplo itera por todos os registros do feed de alterações que estão entre as 15h do dia 2 de março de 2020 e as 14h do dia 7 de agosto de 2020, adiciona-os a uma lista e retorna essa lista ao chamador.

Selecionar segmentos para um intervalo de tempo

public async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2020, 3, 2, 15, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2020, 8, 7, 2, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

A hora de início que você fornece é arredondada para baixo, para a hora mais próxima, e a hora de término é arredondada para cima, para a hora mais próxima. É possível que os usuários vejam eventos que ocorreram antes da hora de início e após a hora de término. Também é possível que alguns eventos que ocorreram entre a hora de início e de término não apareçam. Isso ocorre porque os eventos podem ter sido registrados durante a hora anterior à hora de início ou durante a hora posterior à hora de término.

Próximas etapas