Azure Cosmos DB를 사용한 트랜잭션 Outbox 패턴

Azure Cosmos DB
Azure Service Bus
Azure 기능

분산 시스템에서 안정적인 메시징을 구현하는 것은 어려울 수 있습니다. 이 문서에서는 Idempotent 메시지 처리를 지원하는 중요한 부분인 신뢰할 수 있는 메시징 및 이벤트 배달을 보장하기 위해 트랜잭션 아웃박스 패턴을 사용하는 방법을 설명합니다. 이를 위해 Azure Cosmos DB 트랜잭션 일괄 처리를 사용하고 Azure Service Bus와 함께 피드를 변경합니다.

개요

마이크로서비스 아키텍처는 점점 더 대중화되고 있으며 특히 대규모 애플리케이션에서 확장성, 유지 관리 가능성 및 민첩성과 같은 문제를 해결하는 데 가능성이 있습니다. 그러나 이 아키텍처 패턴은 데이터 처리와 관련하여 문제도 발생합니다. 분산 애플리케이션에서 각 서비스는 전용 서비스 소유 데이터 저장소에서 작동하는 데 필요한 데이터를 독립적으로 유지 관리합니다. 이러한 시나리오를 지원하기 위해 일반적으로 한 서비스의 데이터(이벤트)를 메시징 버스를 통해 애플리케이션의 다른 서비스로 배포하는 RabbitMQ, Kafka 또는 Azure Service Bus와 같은 메시징 솔루션을 사용합니다. 그러면 내부 또는 외부 소비자가 해당 메시지를 구독하고 데이터가 조작되는 즉시 변경 내용에 대한 알림을 받을 수 있습니다.

해당 영역에서 잘 알려진 예는 주문 시스템입니다. 사용자가 주문을 만들려고 할 때 Ordering 서비스는 REST 엔드포인트를 통해 클라이언트 애플리케이션에서 데이터를 수신합니다. 페이로드를 Order 개체의 내부 표현에 매핑하여 데이터의 유효성을 검사합니다. 데이터베이스에 성공적으로 커밋되면 OrderCreated 이벤트를 메시지 버스에 게시합니다. 새로운 주문에 관심이 있는 다른 서비스(예: Inventory 또는 Invoicing 서비스)는 OrderCreated 메시지를 구독하고 처리하고 자체 데이터베이스에 저장합니다.

다음 의사 코드는 이 프로세스가 일반적으로 Ordering 서비스 관점에서 어떻게 보이는지 보여 줍니다.

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

이 방법은 주문 개체를 저장하고 해당 이벤트를 게시하는 사이에 오류가 발생할 때까지 잘 작동합니다. 여러 가지 이유로 이 시점에서 이벤트 보내기가 실패할 수 있습니다.

  • 네트워크 오류
  • 메시지 서비스 중단
  • 호스트 장애

오류가 무엇이든 결과는 OrderCreated 이벤트를 메시지 버스에 게시할 수 없습니다. 다른 서비스는 주문이 만들어졌다는 알림을 받지 않습니다. Ordering 서비스는 이제 실제 업무 프로세스와 관련이 없는 다양한 것들을 처리해야 합니다. 다시 온라인 상태가 되자마자 메시지 버스에 올려야 하는 이벤트를 추적해야 합니다. 이벤트 손실로 인한 애플리케이션의 데이터 불일치와 같은 최악의 경우도 발생할 수 있습니다.

Diagram that shows event handling without the Transactional Outbox pattern.

솔루션

이러한 상황을 방지할 수 있도록 하는 트랜잭션 Outbox라는 잘 알려진 패턴이 있습니다. 이벤트가 궁극적으로 메시지 브로커에 푸시되기 전에 이벤트가 데이터 저장소(일반적으로 데이터베이스의 Outbox 테이블)에 저장되도록 합니다. 비즈니스 개체와 해당 이벤트가 동일한 데이터베이스 트랜잭션 내에 저장되면 데이터가 손실되지 않습니다. 모든 것이 커밋되거나 오류가 있는 경우 모든 것이 롤백됩니다. 결국 이벤트를 게시하기 위해 다른 서비스 또는 작업자 프로세스에서 처리되지 않은 항목에 대해 Outbox 테이블을 쿼리하고 이벤트를 게시하고 처리된 것으로 표시합니다. 이 패턴은 비즈니스 개체가 만들어지거나 수정된 후 이벤트가 손실되지 않도록 합니다.

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

이 아키텍처의 Visio 파일을 다운로드합니다.

관계형 데이터베이스에서 패턴의 구현은 간단합니다. 예를 들어 서비스가 Entity Framework Core를 사용하는 경우 Entity Framework 컨텍스트를 사용하여 데이터베이스 트랜잭션을 만들고 비즈니스 개체와 이벤트를 저장하고 트랜잭션을 커밋하거나 롤백을 수행합니다. 또한 이벤트를 처리하는 작업자 서비스는 구현하기 쉽습니다. 새 항목에 대해 Outbox 테이블을 주기적으로 쿼리하고, 새로 삽입된 이벤트를 메시지 버스에 게시하고, 마지막으로 이러한 항목을 처리된 것으로 표시합니다.

실제로는 처음에 보이는 것처럼 쉽지 않습니다. 가장 중요한 것은 OrderUpdated 이벤트가 OrderCreated 이벤트보다 먼저 게시되지 않도록 이벤트의 순서를 유지해야 한다는 것입니다.

Azure Cosmos DB에서 구현

이 섹션에서는 Azure Cosmos DB 변경 피드 및 Service Bus를 사용하여 다양한 서비스 간에 안정적인 순서대로 메시징을 구현하기 위해 Azure Cosmos DB에서 트랜잭션 Outbox 패턴을 구현하는 방법을 보여 줍니다. Contact 개체(FirstName, LastName, Email, Company 정보 등)를 관리하는 샘플 서비스를 보여 줍니다. CQRS(명령과 쿼리의 역할 분리) 패턴을 사용하고 기본 도메인 중심 설계 개념을 따릅니다. 구현을 위한 샘플 코드는 GitHub에서 찾을 수 있습니다.

샘플 서비스의 Contact 개체는 다음과 같은 구조를 갖습니다.

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Contact가 만들어지거나 업데이트되는 즉시 현재 변경 내용에 대한 정보가 포함된 이벤트가 발생합니다. 무엇보다도 도메인 이벤트는 다음과 같습니다.

  • ContactCreated. 연락처가 추가되면 발생합니다.
  • ContactNameUpdated. FirstName 또는 LastName이 변경되면 발생합니다.
  • ContactEmailUpdated. 이메일 주소가 업데이트되면 발생합니다.
  • ContactCompanyUpdated. 회사 속성이 변경되면 발생합니다.

트랜잭션 일괄 처리

이 패턴을 구현하려면 Contact 비즈니스 개체와 해당 이벤트가 동일한 데이터베이스 트랜잭션에 저장되도록 해야 합니다. Azure Cosmos DB에서 트랜잭션은 관계형 데이터베이스 시스템에서와 다르게 작동합니다. 트랜잭션 일괄 처리라고 하는 Azure Cosmos DB 트랜잭션은 단일 논리 파티션에서 작동하므로 ACID(원자성, 일관성, 격리 및 내구성) 속성을 보장합니다. 서로 다른 컨테이너 또는 논리 파티션의 트랜잭션 일괄 처리 작업에서 두 개의 문서를 저장할 수 없습니다. 샘플 서비스의 경우 이는 비즈니스 개체와 이벤트 또는 이벤트가 모두 동일한 컨테이너 및 논리 파티션에 배치됨을 의미합니다.

컨텍스트, 리포지토리 및 UnitOfWork

샘플 구현의 핵심은 동일한 트랜잭션 일괄 처리에 저장된 개체를 추적하는 컨테이너 컨텍스트입니다. 만들기 및 수정된 개체 목록을 유지 관리하고 단일 Azure Cosmos DB 컨테이너에서 작동합니다. 인터페이스는 다음과 같습니다.

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

컨테이너 컨텍스트 구성 요소의 목록은 ContactDomainEvent 개체를 추적합니다. 둘 다 동일한 컨테이너에 배치됩니다. 즉, 여러 유형의 개체가 동일한 Azure Cosmos DB 컨테이너에 저장되고 Type 속성을 사용하여 비즈니스 개체와 이벤트를 구별합니다.

각 유형에 대해 데이터 액세스를 정의하고 구현하는 전용 리포지토리가 있습니다. Contact 리포지토리 인터페이스는 다음 메서드를 제공합니다.

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Event 리포지토리는 리포지토리에 새 이벤트를 만드는 메서드가 하나만 있다는 점을 제외하면 비슷해 보입니다.

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

두 리포지토리 인터페이스의 구현은 둘 다 동일한 Azure Cosmos DB 컨텍스트에서 작동하도록 단일 IContainerContext 인스턴스에 대한 종속성 주입을 통해 참조를 가져옵니다.

마지막 구성 요소는 IContainerContext 인스턴스에 보관된 변경 내용을 Azure Cosmos DB에 커밋하는 UnitOfWork입니다.

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

이벤트 처리: 만들기 및 게시

Contact 개체가 만들기, 수정 또는(소프트) 삭제될 때마다 서비스는 해당 이벤트를 발생시킵니다. 제공되는 솔루션의 핵심은 도메인 기반 설계(DDD)와 Jimmy Bogard가 제안한 중재자 패턴의 조합입니다. 그는 도메인 개체의 수정으로 인해 발생한 이벤트 목록을 유지 관리하고 실제 개체를 데이터베이스에 저장하기 전에 이러한 이벤트를 게시할 것을 제안합니다.

변경 목록은 다른 구성 요소가 이벤트 체인을 수정할 수 없도록 도메인 개체 자체에 보관됩니다. 도메인 개체에서 이벤트(IEvent 인스턴스)를 유지 관리하는 동작은 인터페이스 IEventEmitter<IEvent>를 통해 정의되고 추상 DomainEntity 클래스에서 구현됩니다.

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Contact 개체는 도메인 이벤트를 발생시킵니다. Contact 항목은 기본 DDD 개념을 따르며 도메인 속성의 설정자를 개별적으로 구성합니다. 클래스에 public setter가 없습니다. 대신 내부 상태를 조작하는 방법을 제공합니다. 이러한 메서드에서 특정 수정(예: ContactNameUpdated 또는 ContactEmailUpdated)에 대한 적절한 이벤트가 발생할 수 있습니다.

다음은 연락처 이름 업데이트의 예입니다. (이벤트는 메서드의 끝에서 발생합니다.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

변경 내용을 추적하는 해당 ContactNameUpdatedEvent는 다음과 같습니다.

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

지금까지는 이벤트가 도메인 개체에 기록되고 아무 것도 데이터베이스에 저장되지 않거나 메시지 브로커에 게시되지 않습니다. 권장 사항에 따라 비즈니스 개체가 데이터 저장소에 저장되기 직전에 이벤트 목록이 처리됩니다. 이 경우 프라이빗 RaiseDomainEvents 메서드에서 구현되는 IContainerContext 인스턴스의 SaveChangesAsync 메서드에서 발생합니다. (dObjs는 컨테이너 컨텍스트의 추적된 항목 목록입니다.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

마지막 줄에서 C#의 중재자 패턴 구현인 MediatR 패키지는 애플리케이션 내에서 이벤트를 게시하는 데 사용됩니다. 이는 ContactNameUpdatedEvent와 같은 모든 이벤트가 MediatR 패키지의 INotification 인터페이스를 구현하기 때문에 가능합니다.

이러한 이벤트는 해당 처리기에서 처리해야 합니다. 여기에서 IEventsRepository 구현이 작동합니다. 다음은 NameUpdated 이벤트 처리기의 샘플입니다.

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

IEventRepository 인스턴스는 생성자를 통해 처리기 클래스에 주입됩니다. ContactNameUpdatedEvent이 서비스에 게시되는 즉시 Handle 메서드가 호출되고 이벤트 리포지토리 인스턴스를 사용하여 알림 개체를 만듭니다. 해당 알림 개체는 IContainerContext 개체의 추적된 개체 목록에 차례로 삽입되고 동일한 트랜잭션 일괄 처리에 저장된 개체를 Azure Cosmos DB에 조인합니다.

지금까지 컨테이너 컨텍스트는 처리할 개체를 알고 있습니다. 추적된 개체를 Azure Cosmos DB에 유지하기 위해 IContainerContext 구현은 트랜잭션 일괄 처리를 만들고 모든 관련 개체를 추가하고 데이터베이스에 대해 작업을 실행합니다. 설명된 프로세스는 SaveChangesAsync 메서드에 의해 호출되는 SaveInTransactionalBatchAsync 메서드에서 처리됩니다.

트랜잭션 일괄 처리를 만들고 실행하는 데 필요한 구현의 중요한 부분은 다음과 같습니다.

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

다음은 지금까지의 프로세스 작동 방식에 대한 개요입니다(연락처 개체의 이름 업데이트).

  1. 클라이언트가 연락처 이름을 업데이트하려고 합니다. 연락처 개체에서 SetName 메서드가 호출되고 속성이 업데이트됩니다.
  2. ContactNameUpdated 이벤트가 도메인 개체의 이벤트 목록에 추가됩니다.
  3. 도메인 개체를 컨테이너 컨텍스트에 추가하는 연락처 리포지토리의 Update 메서드가 호출됩니다. 이제 개체가 추적됩니다.
  4. CommitAsyncUnitOfWork 인스턴스에서 호출되며, 이는 차례로 컨테이너 컨텍스트에서 SaveChangesAsync를 호출합니다.
  5. SaveChangesAsync 내에서 도메인 개체 목록의 모든 이벤트는 MediatR 인스턴스에 의해 게시되고 이벤트 리포지토리를 통해 동일한 컨테이너 컨텍스트에 추가됩니다.
  6. SaveChangesAsync에서 TransactionalBatch가 만들어집니다. 연락처 개체와 이벤트를 모두 보유합니다.
  7. TransactionalBatch가 실행되고 데이터가 Azure Cosmos DB에 커밋됩니다.
  8. SaveChangesAsyncCommitAsync가 성공적으로 반환됩니다.

지속성

앞의 코드 조각에서 볼 수 있듯이 Azure Cosmos DB에 저장된 모든 개체는 DataObject 인스턴스에 래핑됩니다. 이 개체는 다음과 같은 공통 속성을 제공합니다.

  • ID.
  • PartitionKey.
  • Type.
  • State. Created와 마찬가지로 Updated는 Azure Cosmos DB에서 유지되지 않습니다.
  • Etag. 낙관적 잠금의 경우.
  • TTL. 오래된 문서를 자동 정리하기 위한 Time To Live 속성입니다.
  • Data. 일반 데이터 개체입니다.

이러한 속성은 IDataObject이라고 하는 일반 인터페이스에 정의되며 리포지토리 및 컨테이너 컨텍스트에서 사용됩니다.


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

DataObject 인스턴스에 래핑되어 데이터베이스에 저장된 개체는 다음 샘플(ContactContactNameUpdatedEvent)과 같습니다.

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

ContactContactNameUpdatedEvent(유형 domainEvent) 문서에 동일한 파티션 키가 있고 두 문서가 동일한 논리 파티션에 유지됨을 알 수 있습니다.

피드 처리 변경

이벤트 스트림을 읽고 메시지 브로커로 보내기 위해 서비스는 Azure Cosmos DB 변경 피드를 사용합니다.

변경 피드는 컨테이너의 변경 내용에 대한 영구 로그입니다. 백그라운드에서 작동하고 수정 사항을 추적합니다. 하나의 논리 파티션 내에서 변경 순서가 보장됩니다. 변경 피드를 읽는 가장 편리한 방법은 Azure Cosmos DB 트리거와 함께 Azure 함수를 사용하는 것입니다. 또 다른 옵션은 변경 피드 프로세서 라이브러리를 사용하는 것입니다. 이를 통해 웹 API의 변경 피드 처리를 백그라운드 서비스로 통합할 수 있습니다(IHostedService 인터페이스를 통해). 여기 샘플은 추상 클래스 BackgroundService를 구현하는 간단한 콘솔 애플리케이션을 사용하여 .NET Core 애플리케이션에서 장기 실행 백그라운드 작업을 호스팅합니다.

Azure Cosmos DB 변경 피드에서 변경 내용을 수신하려면 ChangeFeedProcessor 개체를 인스턴스화하고 메시지 처리를 위한 처리기 메서드를 등록하고 변경 내용 수신을 시작해야 합니다.

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

그런 다음 처리기 메서드(여기서는 HandleChangesAsync)가 메시지를 처리합니다. 이 샘플에서 이벤트는 확장성을 위해 분할되고 중복 제거 기능이 사용하도록 설정된 Service Bus 항목에 게시됩니다. 그러면 Contact 개체 변경에 관심이 있는 모든 서비스가 해당 Service Bus 항목을 구독하고 자체 컨텍스트에 대한 변경 내용을 수신하고 처리할 수 있습니다.

생성된 Service Bus 메시지에는 SessionId 속성이 있습니다. Service Bus에서 세션을 사용하면 메시지 순서가 유지됩니다(FIFO). 이 사용 사례에서는 순서를 유지해야 합니다.

다음은 변경 피드의 메시지를 처리하는 코드 조각입니다.

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

오류 처리

변경 내용이 처리되는 동안 오류가 발생하면 변경 피드 라이브러리는 마지막 일괄 처리를 성공적으로 처리한 위치에서 메시지 읽기를 다시 시작합니다. 예를 들어 애플리케이션이 10,000개의 메시지를 성공적으로 처리하고 현재 일괄 처리 10,001에서 10,025까지 작업 중이고 오류가 발생하면 다시 시작하여 위치 10,001에서 작업을 선택할 수 있습니다. 라이브러리는 Azure Cosmos DB의 Leases 컨테이너에 저장된 정보를 통해 처리된 내용을 자동으로 추적합니다.

서비스에서 Service Bus로 다시 처리되는 일부 메시지를 이미 보냈을 수 있습니다. 일반적으로 해당 시나리오는 중복 메시지 처리로 이어집니다. 앞에서 언급했듯이 Service Bus에는 이 시나리오에 대해 사용하도록 설정해야 하는 중복 메시지 검색 기능이 있습니다. 서비스는 메시지의 애플리케이션 제어 MessageId 속성을 기반으로 메시지가 이미 Service Bus 항목(또는 큐)에 추가되었는지 확인합니다. 해당 속성은 이벤트 문서의 ID로 설정됩니다. 동일한 메시지가 Service Bus로 다시 보내지면 서비스는 이를 무시하고 삭제합니다.

하우스키핑

일반적인 트랜잭션 Outbox 구현에서 서비스는 처리된 이벤트를 업데이트하고 Processed 속성을 true로 설정하여 메시지가 성공적으로 게시되었음을 나타냅니다. 이 동작은 처리기 메서드에서 수동으로 구현할 수 있습니다. 현재 시나리오에서는 이러한 프로세스가 필요하지 않습니다. Azure Cosmos DB는 변경 피드(Leases 컨테이너와 함께)를 사용하여 처리된 이벤트를 추적합니다.

마지막 단계로 경우에 따라 가장 최근의 기록/문서만 보관하도록 컨테이너에서 이벤트를 삭제해야 합니다. 정기적으로 정리를 수행하기 위해 구현은 문서에 Azure Cosmos DB의 다른 기능인 TTL(Time To Live)(TTL)을 적용합니다. Azure Cosmos DB는 문서에 추가할 수 있는 TTL 속성(시간 범위(초))을 기반으로 문서를 자동으로 삭제할 수 있습니다. 서비스는 TTL 속성이 있는 문서에 대해 컨테이너를 지속적으로 확인합니다. 문서가 만료되는 즉시 Azure Cosmos DB는 데이터베이스에서 문서를 제거합니다.

모든 구성 요소가 예상대로 작동하면 이벤트가 몇 초 이내에 신속하게 처리되고 게시됩니다. Azure Cosmos DB에 오류가 있는 경우 비즈니스 개체와 해당 이벤트를 모두 데이터베이스에 저장할 수 없기 때문에 이벤트가 메시지 버스로 전송되지 않습니다. 백그라운드 작업자(변경 피드 프로세서) 또는 서비스 버스를 사용할 수 없을 때 DomainEvent 문서에 적절한 TTL 값을 설정해야 한다는 것만 유념하면 됩니다. 프로덕션 환경에서는 여러 날의 기간을 선택하는 것이 가장 좋습니다. 예를 들어 10일을 선택합니다. 관련된 모든 구성 요소는 애플리케이션 내에서 변경 내용을 처리/게시하기에 충분한 시간을 갖습니다.

요약

트랜잭션 Outbox 패턴은 분산 시스템에서 도메인 이벤트를 안정적으로 게시하는 문제를 해결합니다. 동일한 트랜잭션 일괄 처리에서 비즈니스 개체의 상태와 이벤트를 커밋하고 백그라운드 프로세서를 메시지 릴레이로 사용하면 내부 또는 외부의 다른 서비스가 결국 의존하는 정보를 받게 됩니다. 이 샘플은 트랜잭션 Outbox 패턴의 기존 구현이 아닙니다. Azure Cosmos DB 변경 피드 및 TTL(Time To Live)과 같은 기능을 사용하여 간단하고 깔끔하게 유지합니다.

다음은 이 시나리오에서 사용되는 Azure 구성 요소에 대한 요약입니다.

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

이 아키텍처의 Visio 파일을 다운로드합니다.

이 솔루션의 장점은 다음과 같습니다.

  • 안정적인 메시징 및 보장된 이벤트 배달.
  • Service Bus를 통한 이벤트 및 메시지 중복 제거의 보존된 순서입니다.
  • 이벤트 문서의 성공적인 처리를 나타내는 추가 Processed 속성을 유지할 필요가 없습니다.
  • TTL을 통해 Azure Cosmos DB에서 이벤트 삭제. 프로세스는 사용자/애플리케이션 요청을 처리하는 데 필요한 요청 단위를 사용하지 않습니다. 대신 백그라운드 작업에서 "남은" 요청 단위를 사용합니다.
  • ChangeFeedProcessor(또는 Azure 함수)을 통한 오류 방지 메시지 처리.
  • 선택 사항: 변경 피드에서 각각 고유한 포인터를 유지 관리하는 여러 변경 피드 프로세서.

고려 사항

이 문서에서 설명하는 샘플 애플리케이션은 Azure Cosmos DB 및 Service Bus를 사용하여 Azure에서 트랜잭션 Outbox 패턴을 구현하는 방법을 보여 줍니다. NoSQL Database를 사용하는 다른 방법도 있습니다. 비즈니스 개체 및 이벤트가 데이터베이스에 안정적으로 저장되도록 보장하기 위해 비즈니스 개체 문서에 이벤트 목록을 포함할 수 있습니다. 이 방법의 단점은 정리 프로세스에서 이벤트가 포함된 각 문서를 업데이트해야 한다는 것입니다. 이는 TTL을 사용하는 것과 비교할 때 특히 요청 단위 비용 측면에서 이상적이지 않습니다.

여기에 제공된 샘플 코드를 프로덕션 준비 코드로 간주해서는 안 됩니다. 다중 스레딩과 관련하여 몇 가지 제한 사항이 있습니다. 특히 DomainEntity 클래스에서 이벤트가 처리되는 방식과 CosmosContainerContext 구현에서 개체를 추적하는 방식이 있습니다. 고유의 구현을 위한 시작점으로 사용합니다. 또는 NServiceBus 또는 MassTransit와 같은 이 기능이 이미 내장된 기존 라이브러리를 사용하는 것이 좋습니다.

시나리오 배포

GitHub(https://github.com/mspnp/transactional-outbox-pattern)에서 이 시나리오를 테스트하기 위한 소스 코드, 배포 파일 및 지침을 찾을 수 있습니다.

참가자

Microsoft에서 이 문서를 유지 관리합니다. 원래 다음 기여자가 작성했습니다.

보안 주체 작성자:

비공개 LinkedIn 프로필을 보려면 LinkedIn에 로그인하세요.

다음 단계

자세히 알아보려면 다음 문서를 검토합니다.