다음을 통해 공유


Azure Blob Storage에서 변경 피드 처리

변경 피드는 스토리지 계정의 Blob 및 Blob 메타데이터에 발생하는 모든 변경 내용에 대한 트랜잭션 로그를 제공합니다. 이 문서에서는 Blob 변경 피드 프로세서 라이브러리를 사용하여 변경 피드 레코드를 읽는 방법을 보여 줍니다.

변경 피드에 대한 자세한 내용은 Azure Blob Storage 변경 피드를 참조하세요.

프로젝트 설정

이 섹션에서는 .NET용 Blob 변경 피드 클라이언트 라이브러리를 사용할 프로젝트를 준비하는 방법을 안내합니다.

패키지 설치

프로젝트 디렉터리에서 명령을 사용하여 dotnet add package .NETAzure Storage Blob 변경 피드 클라이언트 라이브러리에 대한 패키지를 설치합니다. 이 예제에서는 명령에 플래그를 --prerelease 추가하여 최신 미리 보기 버전을 설치합니다.

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

이 문서의 코드 예제에서는 Azure Blob Storage 및 Azure ID 패키지도 사용합니다.

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

using 지시문 추가

코드 파일에 다음 using 지시문을 추가합니다.

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

클라이언트 개체 만들기

Blob Storage에 애플리케이션을 연결하려면 클래스의 인스턴스를 BlobServiceClient 만듭니다. 다음 예제에서는 권한 부여에 사용 하 여 DefaultAzureCredential 클라이언트 개체를 만드는 방법을 보여 있습니다. 자세한 내용은 액세스 권한 부여를 참조 하고 Blob Storage에 연결합니다. 변경 피드를 사용하려면 Azure RBAC 기본 제공 역할 Storage Blob 데이터 판독 기 이상이 필요합니다.

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

클라이언트 개체는 이 문서에 표시된 일부 메서드에 매개 변수로 전달됩니다.

변경 피드에서 레코드 읽기

참고 항목

변경 피드는 스토리지 계정에 있는 변경할 수 없는 읽기 전용 엔터티입니다. 제한 없는 애플리케이션에서 편리한 방식으로 동시에 또는 독립적으로 변경 피드를 읽고 처리할 수 있습니다. 애플리케이션에서 레코드를 읽어도 레코드는 변경 피드에서 제거되지 않습니다. 사용하는 각 판독기의 읽기 또는 반복 상태는 독립적이며 애플리케이션에서만 유지 관리됩니다.

다음 코드 예제에서는 변경 피드의 모든 레코드를 반복하고 목록에 추가한 다음 변경 피드 이벤트 목록을 반환합니다.

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

다음 코드 예제에서는 변경 피드 이벤트 목록에서 일부 값을 인쇄합니다.

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

저장된 위치에서 레코드 읽기 계속

변경 피드에 읽기 위치를 저장한 다음, 나중에 레코드 반복을 다시 시작하도록 선택할 수 있습니다. 변경 피드 커서를 가져와 읽기 위치를 저장할 수 있습니다. 커서는 문자열이며 애플리케이션은 파일 또는 데이터베이스와 같이 애플리케이션의 디자인에 적합한 방식으로 해당 문자열을 저장할 수 있습니다.

이 예제에서는 변경 피드의 모든 레코드를 반복하고, 목록에 추가하고, 커서를 저장합니다. 목록 및 커서가 호출자에게 반환됩니다.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

레코드 처리 스트리밍

변경 피드 레코드가 변경 피드에 커밋되면 변경 피드 레코드를 처리하도록 선택할 수 있습니다. 사양을 참조하세요. 변경 이벤트는 평균 60초 동안 변경 피드에 게시됩니다. 폴링 간격을 지정할 때 이 기간의 새로운 변경 내용을 폴링하는 것이 좋습니다.

이 예제에서는 정기적으로 변경 내용을 폴링합니다. 변경 레코드가 있는 경우 이 코드는 해당 레코드를 처리하고 변경 피드 커서를 저장합니다. 이러한 방식으로 프로세스가 중지되었다가 다시 시작되는 경우 애플리케이션은 커서를 사용하여 마지막으로 중지했던 레코드의 처리를 다시 시작할 수 있습니다. 다음은 데모용으로 커서를 로컬 파일에 저장하는 예제이지만 애플리케이션은 시나리오에 가장 적합한 형식으로 저장할 수 있습니다.

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

특정 시간 범위 내의 레코드 읽기

특정 시간 범위 내에 속하는 레코드를 읽을 수 있습니다. 이 예제에서는 특정 날짜 및 시간 범위에 속하는 변경 피드의 모든 레코드를 반복하여 목록에 추가하고 목록을 반환합니다.

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

사용자가 제공하는 시작 시간은 가장 가까운 시간으로 반내림되고 종료 시간은 가장 가까운 시간으로 반올림됩니다. 사용자는 시작 시간 이전에 발생한 이벤트와 종료 시간 이후에 발생한 이벤트를 볼 수 있습니다. 또한 시작 시간과 종료 시간 사이에 발생하는 일부 이벤트는 표시되지 않을 수 있습니다. 이벤트가 시작 시간 이전 시간 또는 종료 시간 이후 시간 동안 기록될 수 있기 때문입니다.

다음 단계