Változáscsatorna feldolgozása a Azure Blob Storage

A változáscsatorna tranzakciónaplókat biztosít a blobok és a tárfiók blob metaadatainak változásairól. Ez a cikk bemutatja, hogyan olvashatja el a változáscsatorna rekordjait a blobmódosítási adatcsatorna processzortárával.

A változáscsatornával kapcsolatos további információkért lásd: Változáscsatorna Azure Blob Storage.

A blobmódosítási adatcsatorna processzortárának lekérése

  1. Nyisson meg egy parancsablakot (például: Windows PowerShell).
  2. A projektkönyvtárból telepítse az Azure.Storage.Blobs.Changefeed NuGet-csomagot.
dotnet add package Azure.Storage.Blobs --version 12.5.1
dotnet add package Azure.Storage.Blobs.ChangeFeed --version 12.0.0-preview.4

Rekordok olvasása

Megjegyzés

A változáscsatorna nem módosítható és írásvédett entitás a tárfiókban. Tetszőleges számú alkalmazás olvashatja és feldolgozhatja a változáscsatornát egyszerre és függetlenül, saját kényelme érdekében. A rendszer nem távolítja el a rekordokat a változáscsatornából, amikor egy alkalmazás felolvassa őket. Az egyes fogyasztó olvasók olvasási vagy iterációs állapota független, és csak az alkalmazás tartja karban.

Ez a példa végigvezeti a változáscsatornában lévő összes rekordot, hozzáadja őket egy listához, majd visszaadja a listát a hívónak.

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Get all the events in the change feed. 
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Ez a példa néhány értéket nyomtat ki a konzolra a lista egyes rekordjaiból.

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        string api = changeFeedEvent.EventData.Api;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Api: " + api);
    }
}

Rekordok olvasásának folytatása mentett helyről

Dönthet úgy, hogy menti az olvasási pozíciót a változáscsatornában, majd egy későbbi időpontban folytathatja az iterálást a rekordokkal. Az olvasási pozíciót a változáscsatorna kurzorának beolvasásával mentheti. A kurzor egy sztring , és az alkalmazás bármilyen módon mentheti ezt a sztringet, ami az alkalmazás tervezéséhez szükséges (például egy fájlba vagy adatbázisba).

Ez a példa végigvezeti a változáscsatornában lévő összes rekordot, hozzáadja őket egy listához, és menti a kurzort. A rendszer visszaadja a listát és a kurzort a hívónak.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync
    (string connectionString,  string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);             
    }

    // Update the change feed cursor.  The cursor is not required to get each page of events,
    // it is intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Rekordok streamfeldolgozása

Dönthet úgy, hogy feldolgozzák a változáscsatorna rekordjait, mivel azok véglegesítve vannak a változáscsatorna számára. Lásd: Specifikációk. A változásesemények átlagosan 60 másodperces időtartamban jelennek meg a változáscsatornában. Javasoljuk, hogy a lekérdezési időköz megadásakor az adott időszakkal kapcsolatos új módosításokat is lekérdezze.

Ez a példa rendszeresen lekérdezi a módosításokat. Ha változásrekordok léteznek, ez a kód feldolgozza ezeket a rekordokat, és menti a változáscsatorna kurzorát. Így ha a folyamat le van állítva, majd újraindul, az alkalmazás a kurzor használatával folytathatja a rekordok feldolgozását, ahol utoljára abbahagyta. Ez a példa egy helyi alkalmazáskonfigurációs fájlba menti a kurzort, de az alkalmazás bármilyen formában mentheti, ami a legérthetőbb a forgatókönyvhöz.

public async Task ChangeFeedStreamAsync
    (string connectionString, int waitTimeMs, string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor).AsPages().GetAsyncEnumerator();

        while (true) 
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    string api = changeFeedEvent.EventData.Api;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Api: " + api);
                }

                // helper method to save cursor. 
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }

}

public void SaveCursor(string cursor)
{
    System.Configuration.Configuration config = 
        ConfigurationManager.OpenExeConfiguration
        (ConfigurationUserLevel.None);

    config.AppSettings.Settings.Clear();
    config.AppSettings.Settings.Add("Cursor", cursor);
    config.Save(ConfigurationSaveMode.Modified);
}

Rekordok olvasása egy időtartományon belül

Az adott időtartományba eső rekordokat olvashatja. Ez a példa végigvezeti a változáscsatorna összes rekordján, amely 2020. március 2-án 15:00 és 2020. augusztus 7-én 2:00 között esik, hozzáadja őket egy listához, majd visszaadja a listát a hívónak.

Szegmensek kiválasztása egy időtartományhoz

public async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2020, 3, 2, 15, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2020, 8, 7, 2, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

A megadott kezdési időpont lefelé kerekül a legközelebbi órára, a befejezési időpont pedig a legközelebbi órára kerekül fel. Előfordulhat, hogy a felhasználók a kezdési időpont előtt és a befejezés után is láthatják az eseményeket. Az is előfordulhat, hogy egyes események, amelyek a kezdési és a befejezési időpont között történnek, nem jelennek meg. Ennek az az oka, hogy az események rögzíthetők a kezdési időpontot megelőző órában vagy a befejezés utáni órában.

Következő lépések