Aracılığıyla paylaş


Azure Blob Depolama'da değişiklik akışını işleme

Değişiklik akışı, depolama hesabınızdaki bloblarda ve blob meta verilerinde gerçekleşen tüm değişikliklerin işlem günlüklerini sağlar. Bu makalede, blob değişiklik akışı işlemci kitaplığını kullanarak değişiklik akışı kayıtlarının nasıl okunduğu gösterilmektedir.

Değişiklik akışı hakkında daha fazla bilgi edinmek için bkz. Azure Blob Depolama'de değişiklik akışı.

Projenizi ayarlama

Bu bölüm, .NET için Bloblar Değişiklik Akışı istemci kitaplığıyla çalışmak üzere bir proje hazırlama işleminde size yol gösterir.

Paketleri yükleme

Proje dizininizden komutunu kullanarak dotnet add package .NET için Azure Depolama Blobları Değişiklik Akışı istemci kitaplığı paketini yükleyin. Bu örnekte, en son önizleme sürümünü yüklemek için komutuna bayrağını ekleyeceğiz --prerelease .

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

Bu makaledeki kod örneklerinde Azure Blob Depolama ve Azure Identity paketleri de kullanılır.

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

Yönerge ekleme using

Kod dosyanıza aşağıdaki using yönergeleri ekleyin:

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

İstemci nesnesi oluşturma

Uygulamayı Blob Depolama'ya bağlamak için sınıfının bir örneğini BlobServiceClient oluşturun. Aşağıdaki örnekte yetkilendirme için kullanarak DefaultAzureCredential bir istemci nesnesinin nasıl oluşturulacağı gösterilmektedir. Daha fazla bilgi edinmek için bkz . Erişimi yetkilendirme ve Blob Depolama'ya bağlanma. Değişiklik akışıyla çalışmak için Azure RBAC yerleşik rolü Depolama Blobu Veri Okuyucusu veya üzeri gerekir.

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

İstemci nesnesi, bu makalede gösterilen yöntemlerden bazılarına parametre olarak geçirilir.

Değişiklik akışındaki kayıtları okuma

Not

Değişiklik akışı, depolama hesabınızda sabit ve salt okunur bir varlıktır. Herhangi bir sayıda uygulama değişiklik akışını kendi kendilerine uygun bir şekilde eşzamanlı ve bağımsız olarak okuyabilir ve işleyebilir. Bir uygulama bunları okuduğunda kayıtlar değişiklik akışından kaldırılmaz. Her tüketen okuyucunun okuma veya yineleme durumu bağımsızdır ve yalnızca uygulamanız tarafından korunur.

Aşağıdaki kod örneği, değişiklik akışındaki tüm kayıtları yineler, bunları bir listeye ekler ve ardından değişiklik akışı olaylarının listesini döndürür:

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Aşağıdaki kod örneği, değişiklik akışı olayları listesinden bazı değerleri yazdırır:

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

Kayıtlı bir konumdan kayıtları okumaya devam etme

Değişiklik akışında okuma konumunuzu kaydetmeyi ve ardından kayıtlarda yinelemeyi gelecekte sürdürmeyi seçebilirsiniz. Değişiklik akışı imlecini alarak okuma konumunu kaydedebilirsiniz. İmleç bir dizedir ve uygulamanız bu dizeyi uygulamanızın tasarımına uygun herhangi bir şekilde (örneğin, bir dosyaya veya veritabanına) kaydedebilir.

Bu örnek, değişiklik akışındaki tüm kayıtları yineler, bunları bir listeye ekler ve imleci kaydeder. Liste ve imleç çağırana döndürülür.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Kayıtların akışla işlenmesi

Değişiklik akışı kayıtları değişiklik akışına işlendiği için işlemeyi seçebilirsiniz. Bkz. Belirtimler. Değişiklik olayları, değişiklik akışında ortalama 60 saniyelik bir sürede yayımlanır. Yoklama aralığınızı belirtirken bu dönemi göz önünde bulundurarak yeni değişiklikleri yoklamanızı öneririz.

Bu örnek, değişiklikleri düzenli aralıklarla yoklar. Değişiklik kayıtları varsa, bu kod bu kayıtları işler ve değişiklik akışı imlecini kaydeder. Bu şekilde işlem durdurulur ve yeniden başlatılırsa uygulama, kayıtları işlemeye kaldığı yerden devam etmek için imleci kullanabilir. Bu örnek, imleci gösterim amacıyla yerel bir dosyaya kaydeder, ancak uygulamanız bunu senaryonuz için en mantıklı olan herhangi bir biçimde kaydedebilir.

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

Belirli bir zaman aralığındaki kayıtları okuma

Belirli bir zaman aralığındaki kayıtları okuyabilirsiniz. Bu örnek, değişiklik akışındaki belirli bir tarih ve saat aralığındaki tüm kayıtları yineler, bunları bir listeye ekler ve listeyi döndürür:

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Sağladığınız başlangıç saati en yakın saate yuvarlanır ve bitiş saati en yakın saate yuvarlanır. Kullanıcılar başlangıç saatinden önce ve bitiş saatinden sonra gerçekleşen olayları görebilir. Başlangıç ve bitiş saati arasında gerçekleşen bazı olayların görünmemesi de mümkündür. Bunun nedeni, olayların başlangıç saatinden önceki bir saat veya bitiş saatinden sonraki saat içinde kaydedilebileceğidir.

Sonraki adımlar