Compartir vía


Procesamiento de la fuente de cambios en Azure Blob Storage

La fuente de cambios proporciona registros de transacciones de todos los cambios que se producen en los blobs y en los metadatos de blobs de la cuenta de almacenamiento. En este artículo se muestra cómo leer los registros de la fuente de cambios con la biblioteca de procesadores de la fuente de cambios de blob.

Para más información sobre la fuente de cambios, consulte Fuente de cambios en Azure Blob Storage.

Configuración del proyecto

En esta sección, se explica cómo preparar un proyecto para que funcione con la biblioteca cliente de fuente de cambios para. NET.

Instalar paquetes

En el directorio del proyecto, instale el paquete de la biblioteca cliente de fuente de cambios de Blobs de Azure Storage para .NET mediante el comando dotnet add package. En este ejemplo, agregamos la marca --prerelease al comando para instalar la versión preliminar más reciente.

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

Los ejemplos de código de este artículo también usan los paquetes Azure Blob Storage y Azure Identity.

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

Agregue directivas using.

Agregue las siguientes directivas using al archivo de código:

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

Creación de un objeto de cliente

Para conectar la aplicación a Blob Storage, cree una instancia de la clase BlobServiceClient. ,En el ejemplo siguiente se muestra cómo crear un objeto de cliente mediante DefaultAzureCredential para la autorización. Para más información, consulte Autorización del acceso y conexión a Blob Storage. Para trabajar con la fuente de cambios, necesita el rol integrado de RBAC de Azure Storage Blob Data Reader o superior.

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

El objeto de cliente se pasa como parámetro a algunos de los métodos que se muestran en este artículo.

Lectura registros en la fuente de cambios

Nota:

La fuente de cambios es una entidad inmutable y de solo lectura en la cuenta de almacenamiento. Cualquier número de aplicaciones puede leer y procesar la fuente de cambios de manera simultáneamente y de manera independiente a su comodidad. Los registros no se quitan de la fuente de cambios cuando una aplicación los lee. El estado de lectura o de iteración de cada lector de consumo es independiente y solo lo mantiene la aplicación.

El ejemplo de código siguiente itera todos los registros de la fuente de cambios, los agrega a una lista y, a continuación, devuelve la lista de eventos de fuente de cambios:

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

En el ejemplo de código siguiente, se imprimen algunos valores de la lista de eventos de fuente de cambios:

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

Reanudación de la lectura de registros a partir de una posición guardada

Puede optar por guardar la posición de lectura en la fuente de cambios y, después, reanudar la iteración de los registros en el futuro. Puede obtener el cursor de la fuente de cambios para guardar la posición de lectura. El cursor es una string y la aplicación puede guardarla de cualquier forma que funcione para el diseño de la aplicación (por ejemplo, en un archivo o base de datos).

En este ejemplo se recorren en iteración todos los registros de la fuente de cambios, se agregan a una lista y se guarda el cursor. La lista y el cursor se devuelven al autor de la llamada.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Flujo del procesamiento de los registros

Puede elegir procesar los registros de la fuente de cambios a medida que se confirmen en la fuente de cambios. Consulte las especificaciones. Los eventos de cambios se publican en la fuente de cambios en un período de 60 segundos de media. Se recomienda que sondee los nuevos cambios con este plazo en mente al especificar el intervalo de sondeo.

Este ejemplo sondea periódicamente los cambios. Si existen cambios en los registros, este código procesa dichos registros y guarda el cursor de la fuente de cambios. De este modo, si el proceso se detiene y, a continuación, se vuelve a iniciar, la aplicación puede usar el cursor para reanudar el procesamiento de los registros en el punto en que se quedó por última vez. En este ejemplo, se guarda el cursor en un archivo local con fines de demostración, pero su aplicación puede guardarlo en cualquier formato que sea más adecuado para su caso.

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

Lectura de registros dentro de un intervalo de tiempo específico

Puede leer los registros que se encuentran dentro de un intervalo de tiempo específico. Este ejemplo itera todos los registros de la fuente de cambios que se encuentran dentro de un intervalo de fecha y hora específico, los agrega a una lista y devuelve la lista:

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

La hora de inicio que proporcione se redondea hacia abajo a la hora más cercana y la hora de finalización se redondea hacia arriba a la hora más cercana. Es posible que los usuarios vean los eventos que se produjeron antes de la hora de inicio y después de la hora de finalización. También es posible que no aparezcan algunos eventos que se producen entre las horas de inicio y finalización. Esto se debe a que los eventos se pueden registrar durante la hora anterior a la hora de inicio o durante la hora posterior a la hora de finalización.

Pasos siguientes