Partilhar via


Processar o feed de alterações no Armazenamento de Blobs do Azure

O feed de alterações fornece registos de transações de todas as alterações que ocorrem nos blobs e nos metadados do blob na sua conta de armazenamento. Este artigo mostra-lhe como ler registos do feed de alterações com a biblioteca do processador do feed de alterações de blobs.

Para saber mais sobre o feed de alterações, consulte Feed de alterações no Armazenamento de Blobs do Azure.

Obter a biblioteca do processador do feed de alterações de blobs

  1. Abra uma janela de comando (por exemplo: Windows PowerShell).
  2. No diretório do projeto, instale o pacote NuGet Azure.Storage.Blobs.Changefeed.
dotnet add package Azure.Storage.Blobs --version 12.5.1
dotnet add package Azure.Storage.Blobs.ChangeFeed --version 12.0.0-preview.4

Ler registos

Nota

O feed de alterações é uma entidade imutável e só de leitura na sua conta de armazenamento. Qualquer número de aplicações pode ler e processar o feed de alterações simultaneamente e independentemente da sua própria conveniência. Os registos não são removidos do feed de alterações quando uma aplicação os lê. O estado de leitura ou iteração de cada leitor consumidor é independente e mantido apenas pela sua aplicação.

Este exemplo itera todos os registos no feed de alterações, adiciona-os a uma lista e, em seguida, devolve essa lista ao autor da chamada.

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Get all the events in the change feed. 
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Este exemplo imprime na consola alguns valores de cada registo na lista.

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        string api = changeFeedEvent.EventData.Api;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Api: " + api);
    }
}

Retomar a leitura de registos a partir de uma posição guardada

Pode optar por guardar a sua posição de leitura no feed de alterações e, em seguida, retomar a iteração através dos registos num momento futuro. Pode guardar a posição de leitura ao obter o cursor do feed de alterações. O cursor é uma cadeia e a sua aplicação pode guardar essa cadeia de carateres de qualquer forma que faça sentido para a estrutura da sua aplicação (por exemplo: para um ficheiro ou base de dados).

Este exemplo itera todos os registos no feed de alterações, adiciona-os a uma lista e guarda o cursor. A lista e o cursor são devolvidos ao autor da chamada.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync
    (string connectionString,  string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);             
    }

    // Update the change feed cursor.  The cursor is not required to get each page of events,
    // it is intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Processamento de fluxos de registos

Pode optar por processar registos do feed de alterações à medida que são consolidados no feed de alterações. Veja Especificações. Os eventos de alteração são publicados no feed de alterações num período de 60 segundos, em média. Recomendamos que consulte novas alterações tendo este período em mente ao especificar o intervalo de votação.

Este exemplo consulta periodicamente alterações. Se existirem registos de alteração, este código processa esses registos e guarda o cursor do feed de alterações. Dessa forma, se o processo for parado e, em seguida, iniciado novamente, a aplicação pode utilizar o cursor para retomar o processamento de registos onde parou pela última vez. Este exemplo guarda o cursor num ficheiro de configuração de aplicação local, mas a sua aplicação pode guardá-lo de qualquer forma que faça mais sentido para o seu cenário.

public async Task ChangeFeedStreamAsync
    (string connectionString, int waitTimeMs, string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor).AsPages().GetAsyncEnumerator();

        while (true) 
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    string api = changeFeedEvent.EventData.Api;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Api: " + api);
                }

                // helper method to save cursor. 
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }

}

public void SaveCursor(string cursor)
{
    System.Configuration.Configuration config = 
        ConfigurationManager.OpenExeConfiguration
        (ConfigurationUserLevel.None);

    config.AppSettings.Settings.Clear();
    config.AppSettings.Settings.Add("Cursor", cursor);
    config.Save(ConfigurationSaveMode.Modified);
}

Ler registos dentro de um intervalo de tempo

Pode ler registos que se inserem num intervalo de tempo específico. Este exemplo itera todos os registos no feed de alterações que se inserem entre as 15:00 de 2 de março de 2020 e as 02:00 de 7 de agosto de 2020, adiciona-os a uma lista e, em seguida, devolve essa lista ao autor da chamada.

Selecionar segmentos para um intervalo de tempo

public async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2020, 3, 2, 15, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2020, 8, 7, 2, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

A hora de início fornecida é arredondada para a hora mais próxima e a hora de fim é arredondada até à hora mais próxima. É possível que os utilizadores vejam eventos que ocorreram antes da hora de início e após a hora de fim. Também é possível que alguns eventos que ocorrem entre a hora de início e de fim não sejam apresentados. Isto deve-se ao facto de os eventos poderem ser registados durante a hora anterior à hora de início ou durante a hora após a hora de fim.

Passos seguintes