Azure Cosmos DB를 사용하는 트랜잭션 아웃박스 패턴
분산 시스템에서 신뢰할 수 있는 메시징을 구현하는 것은 어려울 수 있습니다. 이 문서에서는 Idempotent 메시지 처리를 지원하는 중요한 부분인 신뢰할 수 있는 메시징 및 이벤트 배달을 보장하기 위해 트랜잭션 아웃박스 패턴을 사용하는 방법을 설명합니다. 이를 위해 Azure Cosmos DB 트랜잭션 일괄 처리 및 변경 피드를 Azure Service Bus와 함께 사용합니다.
개요
마이크로 서비스 아키텍처는 특히 대규모 애플리케이션에서 확장성, 유지 관리 효율성 및 민첩성과 같은 문제를 해결하는 데 큰 인기를 얻고 있습니다. 그러나 이 아키텍처 패턴은 데이터 처리와 관련하여도 문제를 발생합니다. 분산 애플리케이션에서 각 서비스는 전용 서비스 소유 데이터 저장소에서 작동하는 데 필요한 데이터를 독립적으로 유지 관리합니다. 이러한 시나리오를 지원하기 위해 일반적으로 RabbitMQ, Kafka 또는 Azure Service Bus와 같은 메시징 솔루션을 사용하여 메시징 버스를 통해 한 서비스에서 애플리케이션의 다른 서비스로 데이터(이벤트)를 배포합니다. 그런 다음, 내부 또는 외부 소비자는 해당 메시지를 구독하고 데이터가 조작되는 즉시 변경에 대한 알림을 받을 수 있습니다.
해당 영역에서 잘 알려진 예제는 주문 시스템입니다. 사용자가 주문을 Ordering
만들려고 할 때 서비스는 REST 엔드포인트를 통해 클라이언트 애플리케이션에서 데이터를 받습니다. 페이로드를 개체의 내부 표현에 Order
매핑하여 데이터의 유효성을 검사합니다. 데이터베이스에 성공적으로 커밋된 후에는 메시지 버스에 이벤트를 게시합니다 OrderCreated
. 새 주문(예: Inventory
Invoicing
서비스)에 관심이 있는 다른 서비스는 메시지를 구독 OrderCreated
하고, 처리하고, 자체 데이터베이스에 저장합니다.
다음 의사 코드는 이 프로세스가 일반적으로 서비스 관점에서 어떻게 보이는 Ordering
지 보여 줍니다.
CreateNewOrder(CreateOrderDto order){
// Validate the incoming data.
...
// Apply business logic.
...
// Save the object to the database.
var result = _orderRespository.Create(order);
// Publish the respective event.
_messagingService.Publish(new OrderCreatedEvent(result));
return Ok();
}
이 방법은 주문 개체를 저장하고 해당 이벤트를 게시하는 사이에 오류가 발생할 때까지 잘 작동합니다. 다음과 같은 여러 가지 이유로 이 시점에서 이벤트를 보내는 데 실패할 수 있습니다.
- 네트워크 오류
- 메시지 서비스 중단
- 호스트 오류
오류가 무엇이든 간에 이벤트는 OrderCreated
메시지 버스에 게시할 수 없습니다. 다른 서비스에는 주문이 생성되었다는 알림이 표시되지 않습니다. 이제 서비스는 Ordering
실제 비즈니스 프로세스와 관련이 없는 다양한 작업을 처리해야 합니다. 다시 온라인 상태가 되는 즉시 메시지 버스에 넣어야 하는 이벤트를 추적해야 합니다. 최악의 경우에도 발생할 수 있습니다. 이벤트 손실로 인해 애플리케이션의 데이터 불일치가 발생할 수 있습니다.
해결 방법
이러한 상황을 방지하는 데 도움이 되는 트랜잭션 아웃박스 라는 잘 알려진 패턴이 있습니다. 이벤트가 궁극적으로 메시지 브로커에 푸시되기 전에 데이터 저장소(일반적으로 데이터베이스의 받은 편지함 테이블)에 저장되도록 합니다. 비즈니스 개체와 해당 이벤트가 동일한 데이터베이스 트랜잭션 내에 저장되는 경우 데이터가 손실되지 않습니다. 모든 항목이 커밋되거나 오류가 발생하면 모든 항목이 롤백됩니다. 결국 이벤트를 게시하기 위해 다른 서비스 또는 작업자 프로세스는 처리되지 않은 항목에 대해 Outbox 테이블을 쿼리하고, 이벤트를 게시하고, 처리된 것으로 표시합니다. 이 패턴은 비즈니스 개체를 만들거나 수정한 후에 이벤트가 손실되지 않도록 합니다.
이 아키텍처의 Visio 파일을 다운로드합니다.
관계형 데이터베이스에서 패턴의 구현은 간단합니다. 예를 들어 서비스에서 Entity Framework Core를 사용하는 경우 Entity Framework 컨텍스트를 사용하여 데이터베이스 트랜잭션을 만들고, 비즈니스 개체와 이벤트를 저장하고, 트랜잭션을 커밋하거나, 롤백을 수행합니다. 또한 이벤트를 처리하는 작업자 서비스는 쉽게 구현할 수 있습니다. 새 항목에 대해 Outbox 테이블을 주기적으로 쿼리하고, 새로 삽입된 이벤트를 메시지 버스에 게시하고, 마지막으로 이러한 항목을 처리됨으로 표시합니다.
실제로는 처음에 보는 것만큼 쉽지 않습니다. 가장 중요한 것은 이벤트 전에 이벤트가 게시 OrderUpdated
되지 않도록 이벤트의 순서가 OrderCreated
유지되는지 확인해야 한다는 것입니다.
Azure Cosmos DB에서 구현
이 섹션에서는 Azure Cosmos DB 변경 피드 및 Service Bus의 도움을 받아 서로 다른 서비스 간에 안정적이고 순서가 지정된 메시징을 달성하기 위해 Azure Cosmos DB에서 트랜잭션 아웃박스 패턴을 구현하는 방법을 보여 줍니다. 개체(Contact
,, FirstName
정보 LastName
Email
등)를 관리하는 Company
샘플 서비스를 보여 줍니다. CQRS(명령 및 쿼리 책임 분리) 패턴을 사용하고 기본 DDD(도메인 기반 디자인) 개념을 따릅니다.
GitHub에서 구현에 대한 샘플 코드를 찾을 수 있습니다.
Contact
샘플 서비스의 개체 구조는 다음과 같습니다.
{
"name": {
"firstName": "John",
"lastName": "Doe"
},
"description": "This is a contact",
"email": "johndoe@contoso.com",
"company": {
"companyName": "Contoso",
"street": "Street",
"houseNumber": "1a",
"postalCode": "092821",
"city": "Palo Alto",
"country": "US"
},
"createdAt": "2021-09-22T11:07:37.3022907+02:00",
"deleted": false
}
생성되거나 업데이트되는 즉시 Contact
현재 변경 내용에 대한 정보가 포함된 이벤트를 내보냅니다. 그 중에서도 도메인 이벤트는 다음과 같습니다.
-
ContactCreated
; 연락처가 추가될 때 발생합니다. -
ContactNameUpdated
; 변경될 때FirstName
LastName
발생합니다. -
ContactEmailUpdated
; 전자 메일 주소가 업데이트될 때 발생합니다. -
ContactCompanyUpdated
; 회사 속성이 변경될 때 발생합니다.
트랜잭션 일괄 처리
이 패턴을 구현하려면 비즈니스 개체와 해당 이벤트가 동일한 데이터베이스 트랜잭션에 저장되도록 해야 Contact
합니다. Azure Cosmos DB에서 트랜잭션은 관계형 데이터베이스 시스템에서와 다르게 작동합니다.
트랜잭션 일괄 처리라고 하는 Azure Cosmos DB 트랜잭션은 단일 논리 파티션에서 작동하므로 ACID(원자성, 일관성, 격리 및 지속성) 속성을 보장합니다. 서로 다른 컨테이너 또는 논리 파티션의 트랜잭션 일괄 처리 작업에는 두 개의 문서를 저장할 수 없습니다. 샘플 서비스의 경우 비즈니스 개체와 이벤트 또는 이벤트가 모두 동일한 컨테이너 및 논리 파티션에 배치됩니다.
컨텍스트, 리포지토리 및 UnitOfWork
샘플 구현의 핵심은 동일한 트랜잭션 일괄 처리에 저장된 개체를 추적하는 컨테이너 컨텍스트 입니다. 만들어지고 수정된 개체 목록을 유지 관리하며 단일 Azure Cosmos DB 컨테이너에서 작동합니다. 인터페이스는 다음과 같습니다.
public interface IContainerContext
{
public Container Container { get; }
public List<IDataObject<Entity>> DataObjects { get; }
public void Add(IDataObject<Entity> entity);
public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
public void Reset();
}
컨테이너 컨텍스트 구성 요소의 목록은 추적 Contact
및 DomainEvent
개체입니다. 둘 다 동일한 컨테이너에 배치됩니다. 즉, 여러 유형의 개체가 동일한 Azure Cosmos DB 컨테이너에 저장되고 속성을 사용하여 Type
비즈니스 개체와 이벤트를 구분합니다.
각 형식에 대해 데이터 액세스를 정의하고 구현하는 전용 리포지토리가 있습니다. 리포지토리 인터페이스는 Contact
다음 메서드를 제공합니다.
public interface IContactsRepository
{
public void Create(Contact contact);
public Task<(Contact, string)> ReadAsync(Guid id, string etag);
public Task DeleteAsync(Guid id, string etag);
public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
public void Update(Contact contact, string etag);
}
저장소에 Event
새 이벤트를 만드는 메서드가 하나뿐이라는 점을 제외하면 리포지토리는 유사합니다.
public interface IEventRepository
{
public void Create(ContactDomainEvent e);
}
두 리포지토리 인터페이스의 구현은 단일 IContainerContext
인스턴스에 대한 종속성 주입을 통해 참조를 가져와 둘 다 동일한 Azure Cosmos DB 컨텍스트에서 작동하도록 합니다.
마지막 구성 요소는 UnitOfWork
인스턴스에 저장된 IContainerContext
변경 내용을 Azure Cosmos DB에 커밋하는 것입니다.
public class UnitOfWork : IUnitOfWork
{
private readonly IContainerContext _context;
public IContactRepository ContactsRepo { get; }
public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
{
_context = ctx;
ContactsRepo = cRepo;
}
public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
{
return _context.SaveChangesAsync(cancellationToken);
}
}
이벤트 처리: 만들기 및 게시
개체를 Contact
만들거나 수정하거나(일시) 삭제할 때마다 서비스는 해당 이벤트를 발생합니다. 제공되는 솔루션의 핵심은 도메인 기반 디자인(DDD)과 Jimmy Bogard가 제안한 중재자 패턴의 조합입니다. 실제 개체를 데이터베이스에 저장하기 전에 도메인 개체 수정으로 인해 발생한 이벤트 목록을 유지하고 이러한 이벤트를 게시하는 것이 좋습니다.
다른 구성 요소가 이벤트 체인을 수정할 수 없도록 변경 목록은 도메인 개체 자체에 유지됩니다. 도메인 개체에서 이벤트(IEvent
인스턴스)를 유지하는 동작은 인터페이스 IEventEmitter<IEvent>
를 통해 정의되고 추상 DomainEntity
클래스에서 구현됩니다.
public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
private readonly List<IEvent> _events = new();
[JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();
public virtual void AddEvent(IEvent domainEvent)
{
var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
if (i < 0)
{
_events.Add(domainEvent);
}
else
{
_events.RemoveAt(i);
_events.Insert(i, domainEvent);
}
}
[...]
[...]
}
이 개체는 Contact
도메인 이벤트를 발생합니다.
Contact
엔터티는 기본 DDD 개념을 따르고 도메인 속성의 setter를 프라이빗으로 구성합니다. 클래스에 공용 setter가 없습니다. 대신 내부 상태를 조작하는 메서드를 제공합니다. 이러한 메서드에서는 특정 수정에 대한 적절한 이벤트(예 ContactNameUpdated
: 또는 ContactEmailUpdated
)를 발생할 수 있습니다.
연락처 이름을 업데이트하는 예제는 다음과 같습니다. (이 이벤트는 메서드의 끝에 발생합니다.)
public void SetName(string firstName, string lastName)
{
if (string.IsNullOrWhiteSpace(firstName) ||
string.IsNullOrWhiteSpace(lastName))
{
throw new ArgumentException("FirstName or LastName cannot be empty");
}
Name = new Name(firstName, lastName);
if (IsNew) return; // if an object is newly created, all modifications will be handled by ContactCreatedEvent
AddEvent(new ContactNameUpdatedEvent(Id, Name));
ModifiedAt = DateTimeOffset.UtcNow;
}
변경 내용을 추적하는 해당 ContactNameUpdatedEvent
항목은 다음과 같습니다.
public class ContactNameUpdatedEvent : ContactDomainEvent
{
public Name Name { get; }
public ContactNameUpdatedEvent(Guid contactId, Name contactName) :
base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
{
Name = contactName;
}
}
지금까지 이벤트는 도메인 개체에 기록되고 데이터베이스에 저장되거나 메시지 브로커에 게시되지도 않습니다. 권장 사항에 따라 비즈니스 개체가 데이터 저장소에 저장되기 바로 전에 이벤트 목록이 처리됩니다. 이 경우 프라이빗 SaveChangesAsync
메서드에서 IContainerContext
구현되는 인스턴스의 RaiseDomainEvents
메서드에서 발생합니다. (dObjs
는 컨테이너 컨텍스트의 추적된 엔터티 목록입니다.)
private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
var eventEmitters = new List<IEventEmitter<IEvent>>();
// Get all EventEmitters.
foreach (var o in dObjs)
if (o.Data is IEventEmitter<IEvent> ee)
eventEmitters.Add(ee);
// Raise events.
if (eventEmitters.Count <= 0) return;
foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
_mediator.Publish(evt);
}
마지막 줄에서 C#의 중재자 패턴 구현인 MediatR 패키지는 애플리케이션 내에서 이벤트를 게시하는 데 사용됩니다. MediatR 패키지의 인터페이스를 구현하는 ContactNameUpdatedEvent
것과 같은 INotification
모든 이벤트가 가능하기 때문입니다.
이러한 이벤트는 해당 처리기에서 처리해야 합니다. 여기서 구현이 IEventsRepository
진행됩니다. 이벤트 처리기의 샘플 NameUpdated
은 다음과 같습니다.
public class ContactNameUpdatedHandler :
INotificationHandler<ContactNameUpdatedEvent>
{
private IEventRepository EventRepository { get; }
public ContactNameUpdatedHandler(IEventRepository eventRepo)
{
EventRepository = eventRepo;
}
public Task Handle(ContactNameUpdatedEvent notification,
CancellationToken cancellationToken)
{
EventRepository.Create(notification);
return Task.CompletedTask;
}
}
IEventRepository
인스턴스는 생성자를 통해 처리기 클래스에 삽입됩니다.
ContactNameUpdatedEvent
서비스에 Handle
게시되는 즉시 메서드가 호출되고 이벤트 리포지토리 인스턴스를 사용하여 알림 개체를 만듭니다. 이 알림 개체는 개체의 추적된 개체 IContainerContext
목록에 삽입되고 동일한 트랜잭션 일괄 처리에 저장된 개체를 Azure Cosmos DB에 조인합니다.
지금까지 컨테이너 컨텍스트는 처리할 개체를 알고 있습니다. 결국 추적된 개체를 Azure Cosmos DB IContainerContext
에 유지하기 위해 구현은 트랜잭션 일괄 처리를 만들고, 모든 관련 개체를 추가하고, 데이터베이스에 대해 작업을 실행합니다. 설명된 프로세스는 메서드에 SaveInTransactionalBatchAsync
의해 호출되는 메서드에서 SaveChangesAsync
처리됩니다.
트랜잭션 일괄 처리를 만들고 실행하는 데 필요한 구현의 중요한 부분은 다음과 같습니다.
private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
CancellationToken cancellationToken)
{
if (DataObjects.Count > 0)
{
var pk = new PartitionKey(DataObjects[0].PartitionKey);
var tb = Container.CreateTransactionalBatch(pk);
DataObjects.ForEach(o =>
{
TransactionalBatchItemRequestOptions tro = null;
if (!string.IsNullOrWhiteSpace(o.Etag))
tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };
switch (o.State)
{
case EntityState.Created:
tb.CreateItem(o);
break;
case EntityState.Updated or EntityState.Deleted:
tb.ReplaceItem(o.Id, o, tro);
break;
}
});
var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
}
// Return copy of current list as result.
var result = new List<IDataObject<Entity>>(DataObjects);
// Work has been successfully done. Reset DataObjects list.
DataObjects.Clear();
return result;
}
연락처 개체의 이름을 업데이트하기 위해 지금까지 프로세스가 작동하는 방식에 대한 개요는 다음과 같습니다.
- 클라이언트는 연락처의 이름을 업데이트하려고 합니다. 이
SetName
메서드는 연락처 개체에서 호출되고 속성이 업데이트됩니다. - 이벤트는
ContactNameUpdated
도메인 개체의 이벤트 목록에 추가됩니다. - 연락처 리포지토리의
Update
메서드가 호출되어 컨테이너 컨텍스트에 도메인 개체를 추가합니다. 이제 개체가 추적됩니다. -
CommitAsync
는 인스턴스에서UnitOfWork
호출되며, 이 인스턴스는 컨테이너 컨텍스트를 호출SaveChangesAsync
합니다. - 도메인
SaveChangesAsync
개체 목록의 모든 이벤트는 인스턴스에서MediatR
게시되고 이벤트 리포지토리를 통해 동일한 컨테이너 컨텍스트에 추가됩니다. - 에서
SaveChangesAsync
aTransactionalBatch
가 만들어집니다. 연락처 개체와 이벤트를 모두 보유합니다. -
TransactionalBatch
실행 및 데이터는 Azure Cosmos DB에 커밋됩니다. -
SaveChangesAsync
성공적으로 반환됩니다CommitAsync
.
고집
앞의 코드 조각에서 볼 수 있듯이 Azure Cosmos DB에 저장된 모든 개체는 인스턴스에 DataObject
래핑됩니다. 이 개체는 다음과 같은 공통 속성을 제공합니다.
-
ID
; -
PartitionKey
; -
Type
; -
State
;Created
마찬가지로Updated
Azure Cosmos DB에는 유지되지 않습니다. -
Etag
; 낙관적 잠금의 경우 -
TTL
; 이전 문서의 자동 정리를 위한 Time To Live 속성입니다. -
Data
; 제네릭 데이터 개체입니다.
이러한 속성은 리포지토리 및 컨테이너 컨텍스트에서 호출 IDataObject
되고 사용되는 제네릭 인터페이스에서 정의됩니다.
public interface IDataObject<out T> where T : Entity
{
string Id { get; }
string PartitionKey { get; }
string Type { get; }
T Data { get; }
string Etag { get; set; }
int Ttl { get; }
EntityState State { get; set; }
}
인스턴스에 DataObject
래핑되어 데이터베이스에 저장된 개체는 다음 샘플과 Contact
같습니다.ContactNameUpdatedEvent
// Contact document/object. After creation.
{
"id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"type": "contact",
"data": {
"name": {
"firstName": "John",
"lastName": "Doe"
},
"description": "This is a contact",
"email": "johndoe@contoso.com",
"company": {
"companyName": "Contoso",
"street": "Street",
"houseNumber": "1a",
"postalCode": "092821",
"city": "Palo Alto",
"country": "US"
},
"createdAt": "2021-09-22T11:07:37.3022907+02:00",
"deleted": false,
"id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
},
"ttl": -1,
"_etag": "\"180014cc-0000-1500-0000-614455330000\"",
"_ts": 1632301657
}
// After setting a new name, this is how an event document looks.
{
"id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
"partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"type": "domainEvent",
"data": {
"name": {
"firstName": "Jane",
"lastName": "Doe"
},
"contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"action": "ContactNameUpdatedEvent",
"id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
"createdAt": "2021-09-22T11:37:37.3022907+02:00"
},
"ttl": 120,
"_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
"_ts": 1632303457
}
및 Contact
(형식ContactNameUpdatedEvent
) 문서에 동일한 파티션 키가 있고 두 문서가 동일한 논리 파티션에 유지된다는 것을 알 domainEvent
수 있습니다.
피드 처리 변경
이벤트 스트림을 읽고 메시지 브로커로 보내려면 서비스에서 Azure Cosmos DB 변경 피드를 사용합니다.
변경 피드는 컨테이너의 변경 내용에 대한 영구 로그입니다. 백그라운드에서 작동하고 수정 사항을 추적합니다. 하나의 논리 파티션 내에서 변경 순서가 보장됩니다. 변경 피드를 읽는 가장 편리한 방법은 Azure Cosmos DB 트리거와 함께 Azure 함수를 사용하는 것입니다. 또 다른 옵션은 변경 피드 프로세서 라이브러리를 사용하는 것입니다. 이를 통해 웹 API에서 변경 피드 처리를 백그라운드 서비스(인터페이스를 통해)로 통합할 IHostedService
수 있습니다. 이 샘플에서는 추상 클래스 BackgroundService 를 구현하는 간단한 콘솔 애플리케이션을 사용하여 .NET Core 애플리케이션에서 장기 실행 백그라운드 작업을 호스트합니다.
Azure Cosmos DB 변경 피드에서 변경 내용을 받으려면 개체를 인스턴스화 ChangeFeedProcessor
하고, 메시지 처리를 위한 처리기 메서드를 등록하고, 변경 내용 수신 대기를 시작해야 합니다.
private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
var changeFeedProcessor = _container
.GetChangeFeedProcessorBuilder<ExpandoObject>(
_configuration.GetSection("Cosmos")["ProcessorName"],
HandleChangesAsync)
.WithInstanceName(Environment.MachineName)
.WithLeaseContainer(_leaseContainer)
.WithMaxItems(25)
.WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
.WithPollInterval(TimeSpan.FromSeconds(3))
.Build();
_logger.LogInformation("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
_logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
return changeFeedProcessor;
}
처리기 메서드(HandleChangesAsync
여기)는 메시지를 처리합니다. 이 샘플에서는 확장성을 위해 분할되고 중복 제거 기능을 사용하도록 설정된 Service Bus 토픽에 이벤트가 게시됩니다. 개체 변경에 Contact
관심이 있는 모든 서비스는 해당 Service Bus 토픽을 구독하고 자체 컨텍스트에 대한 변경 내용을 수신하고 처리할 수 있습니다.
생성된 Service Bus 메시지에는 속성이 있습니다 SessionId
. Service Bus에서 세션을 사용하는 경우 메시지 순서가 유지되도록 보장합니다(FIFO(첫 번째, 첫 번째). 이 사용 사례에는 순서를 보존해야 합니다.
다음은 변경 피드의 메시지를 처리하는 코드 조각입니다.
private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
_logger.LogInformation($"Received {changes.Count} document(s).");
var eventsCount = 0;
Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();
foreach (var document in changes as dynamic)
{
if (!((IDictionary<string, object>)document).ContainsKey("type") ||
!((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.
if (document.type == EVENT_TYPE) // domainEvent.
{
string json = JsonConvert.SerializeObject(document.data);
var sbMessage = new ServiceBusMessage(json)
{
ContentType = "application/json",
Subject = document.data.action,
MessageId = document.id,
PartitionKey = document.partitionKey,
SessionId = document.partitionKey
};
// Create message batch per partitionKey.
if (partitionedMessages.ContainsKey(document.partitionKey))
{
partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
}
else
{
partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
}
eventsCount++;
}
}
if (partitionedMessages.Count > 0)
{
_logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");
// Loop over each partition.
foreach (var partition in partitionedMessages)
{
// Create batch for partition.
using var messageBatch =
await _topicSender.CreateMessageBatchAsync(cancellationToken);
foreach (var msg in partition.Value)
if (!messageBatch.TryAddMessage(msg))
throw new Exception();
_logger.LogInformation(
$"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");
try
{
await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e.Message);
throw;
}
}
}
else
{
_logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
}
}
오류 처리
변경 내용을 처리하는 동안 오류가 발생하면 변경 피드 라이브러리가 마지막 일괄 처리를 성공적으로 처리한 위치에서 메시지 읽기를 다시 시작합니다. 예를 들어 애플리케이션이 10,000개의 메시지를 성공적으로 처리한 경우 현재 일괄 처리 10,001에서 10,025까지 작업 중이며 오류가 발생하면 다시 시작하여 10,001 위치에서 작업을 선택할 수 있습니다. 라이브러리는 Azure Cosmos DB의 컨테이너에 Leases
저장된 정보를 통해 처리된 내용을 자동으로 추적합니다.
서비스에서 Service Bus로 다시 처리되는 메시지 중 일부를 이미 보냈을 수 있습니다. 일반적으로 이 시나리오에서는 메시지 처리가 중복됩니다. 앞에서 설명한 것처럼 Service Bus에는 이 시나리오에 사용하도록 설정해야 하는 중복 메시지 검색 기능이 있습니다. 서비스는 메시지의 애플리케이션 제어 속성을 기반으로 메시지가 Service Bus 토픽(또는 큐)에 이미 추가되었는지 MessageId
확인합니다. 해당 속성은 이벤트 문서의 속성으로 ID
설정됩니다. 동일한 메시지가 Service Bus로 다시 전송되면 서비스는 해당 메시지를 무시하고 삭제합니다.
하우스키핑
일반적인 트랜잭션 아웃박스 구현에서 서비스는 처리된 이벤트를 업데이트하고 속성이 Processed
성공적으로 게시되었음을 나타내는 속성을 설정합니다true
. 이 동작은 처리기 메서드에서 수동으로 구현할 수 있습니다. 현재 시나리오에서는 이러한 프로세스가 필요하지 않습니다. Azure Cosmos DB는 변경 피드를 사용하여 처리된 이벤트를 추적합니다(컨테이너와 Leases
함께).
마지막 단계로, 최근 레코드/문서만 유지하도록 컨테이너에서 이벤트를 삭제해야 하는 경우도 있습니다. 주기적으로 정리를 수행하려면 구현은 문서에 Azure Cosmos DB: Time To Live(TTL
)의 또 다른 기능을 적용합니다. Azure Cosmos DB는 문서에 추가할 수 있는 TTL
속성(시간 범위(초)에 따라 문서를 자동으로 삭제할 수 있습니다. 서비스는 속성이 있는 문서의 컨테이너를 지속적으로 확인합니다 TTL
. 문서가 만료되는 즉시 Azure Cosmos DB는 데이터베이스에서 문서를 제거합니다.
모든 구성 요소가 예상대로 작동하면 이벤트가 몇 초 내에 빠르게 처리되고 게시됩니다. Azure Cosmos DB에 오류가 있는 경우 비즈니스 개체와 해당 이벤트를 모두 데이터베이스에 저장할 수 없으므로 이벤트가 메시지 버스로 전송되지 않습니다. 백그라운드 작업자(변경 피드 프로세서) 또는 서비스 버스를 사용할 수 없는 경우 문서에 적절한 TTL
값을 DomainEvent
설정하는 것이 좋습니다. 프로덕션 환경에서는 며칠의 시간 범위를 선택하는 것이 가장 좋습니다. 예를 들어 10일입니다. 관련된 모든 구성 요소는 애플리케이션 내에서 변경 내용을 처리/게시하는 데 충분한 시간을 갖습니다.
요약
트랜잭션 아웃박스 패턴은 분산 시스템에서 도메인 이벤트를 안정적으로 게시하는 문제를 해결합니다. 동일한 트랜잭션 일괄 처리에서 비즈니스 개체의 상태 및 해당 이벤트를 커밋하고 백그라운드 프로세서를 메시지 릴레이로 사용하면 내부 또는 외부의 다른 서비스가 결국 종속된 정보를 받게 됩니다. 이 샘플은 트랜잭션 아웃박스 패턴의 기존 구현이 아닙니다. Azure Cosmos DB 변경 피드 및 Time To Live와 같은 기능을 사용하여 작업을 간단하고 깨끗하게 유지합니다.
이 시나리오에서 사용되는 Azure 구성 요소에 대한 요약은 다음과 같습니다.
이 아키텍처의 Visio 파일을 다운로드합니다.
이 솔루션의 장점은 다음과 같습니다.
- 신뢰할 수 있는 메시징 및 이벤트 배달 보장
- Service Bus를 통한 이벤트 및 메시지 중복 제거 순서가 유지됩니다.
- 이벤트 문서를 성공적으로 처리했음을 나타내는 추가
Processed
속성을 유지할 필요가 없습니다. - TTL(Time to Live)을 통해 Azure Cosmos DB에서 이벤트를 삭제합니다. 프로세스는 사용자/애플리케이션 요청을 처리하는 데 필요한 요청 단위를 소비하지 않습니다. 대신 백그라운드 작업에서 "남은" 요청 단위를 사용합니다.
- (또는 Azure 함수)를 통한
ChangeFeedProcessor
메시지의 오류 방지 처리입니다. - 선택 사항: 변경 피드 프로세서가 여러 개 있으며 각각 변경 피드에서 자체 포인터를 유지 관리합니다.
고려 사항
이 문서에서 설명하는 샘플 애플리케이션은 Azure Cosmos DB 및 Service Bus를 사용하여 Azure에서 트랜잭션 아웃박스 패턴을 구현하는 방법을 보여 줍니다. NoSQL 데이터베이스를 사용하는 다른 방법도 있습니다. 비즈니스 개체 및 이벤트가 데이터베이스에 안정적으로 저장되도록 하려면 비즈니스 개체 문서에 이벤트 목록을 포함할 수 있습니다. 이 방법의 단점은 정리 프로세스가 이벤트를 포함하는 각 문서를 업데이트해야 한다는 것입니다. 이는 특히 요청 단위 비용 측면에서 TTL 사용과 비교하여 이상적이지 않습니다.
여기에 제공된 샘플 코드는 프로덕션 준비 코드로 간주해서는 안 됩니다. 다중 스레딩, 특히 클래스에서 이벤트가 처리되는 방식 및 구현에서 DomainEntity
CosmosContainerContext
개체를 추적하는 방법에 대한 몇 가지 제한 사항이 있습니다. 사용자 고유의 구현을 위한 시작점으로 사용합니다. 또는 NServiceBus 또는 MassTransit과 같은 이 기능이 이미 내장된 기존 라이브러리를 사용하는 것이 좋습니다.
이 시나리오를 배포하십시오
GitHub https://github.com/mspnp/transactional-outbox-pattern에서 이 시나리오를 테스트하기 위한 소스 코드, 배포 파일 및 지침을 찾을 수 있습니다.
기여자
Microsoft에서 이 문서를 유지 관리합니다. 그것은 원래 다음 기여자에 의해 작성되었습니다.
대표 저자:
- Christian Dennig | 주요 소프트웨어 엔지니어
비공개 LinkedIn 프로필을 보려면 LinkedIn에 로그인합니다.
다음 단계
자세히 알아보려면 다음 문서를 검토합니다.