다음을 통해 공유


이벤트 구독

팁 (조언)

이 콘텐츠는 .NET Docs 또는 오프라인으로 읽을 수 있는 다운로드 가능한 무료 PDF로 제공되는 컨테이너화된 .NET 애플리케이션용 .NET 마이크로 서비스 아키텍처인 eBook에서 발췌한 내용입니다.

컨테이너화된 .NET 애플리케이션을 위한 .NET 마이크로서비스 아키텍처 eBook의 표지 썸네일.

이벤트 버스를 사용하는 첫 번째 단계는 수신하려는 이벤트에 마이크로 서비스를 구독하는 것입니다. 이 기능은 수신기 마이크로 서비스에서 수행해야 합니다.

다음 간단한 코드는 필요한 이벤트를 구독하도록 서비스를 시작할 때 각 수신기 마이크로 서비스가 구현해야 하는 항목(즉, Startup 클래스)을 보여 줍니다. 이 경우, basket-api 마이크로서비스는 ProductPriceChangedIntegrationEventOrderStartedIntegrationEvent 메시지를 구독해야 합니다.

예를 들어 이벤트를 구독할 ProductPriceChangedIntegrationEvent 때 장바구니 마이크로 서비스에서 제품 가격 변경 내용을 인식하고 해당 제품이 사용자의 바구니에 있는 경우 변경에 대해 사용자에게 경고할 수 있습니다.

var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

eventBus.Subscribe<ProductPriceChangedIntegrationEvent,
                   ProductPriceChangedIntegrationEventHandler>();

eventBus.Subscribe<OrderStartedIntegrationEvent,
                   OrderStartedIntegrationEventHandler>();

이 코드가 실행되면 구독자 마이크로 서비스가 RabbitMQ 채널을 통해 수신 대기합니다. ProductPriceChangedIntegrationEvent 형식의 메시지가 도착하면 코드는 전달된 이벤트 처리기를 호출하고 이벤트를 처리합니다.

이벤트 버스를 통해 이벤트 게시

마지막으로 메시지 보낸 사람(원본 마이크로 서비스)은 다음 예제와 유사한 코드를 사용하여 통합 이벤트를 게시합니다. (이 방법은 원자성을 고려하지 않는 간소화된 예제입니다.) 일반적으로 원본 마이크로 서비스에서 데이터 또는 트랜잭션을 커밋한 직후에 여러 마이크로 서비스에 이벤트를 전파해야 할 때마다 유사한 코드를 구현합니다.

먼저 다음 코드와 같이 이벤트 버스 구현 개체(RabbitMQ 또는 서비스 버스 기반)가 컨트롤러 생성자에 삽입됩니다.

[Route("api/v1/[controller]")]
public class CatalogController : ControllerBase
{
    private readonly CatalogContext _context;
    private readonly IOptionsSnapshot<Settings> _settings;
    private readonly IEventBus _eventBus;

    public CatalogController(CatalogContext context,
        IOptionsSnapshot<Settings> settings,
        IEventBus eventBus)
    {
        _context = context;
        _settings = settings;
        _eventBus = eventBus;
    }
    // ...
}

그런 다음 UpdateProduct 메서드와 같이 컨트롤러의 메서드에서 사용합니다.

[Route("items")]
[HttpPost]
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem product)
{
    var item = await _context.CatalogItems.SingleOrDefaultAsync(
        i => i.Id == product.Id);
    // ...
    if (item.Price != product.Price)
    {
        var oldPrice = item.Price;
        item.Price = product.Price;
        _context.CatalogItems.Update(item);
        var @event = new ProductPriceChangedIntegrationEvent(item.Id,
            item.Price,
            oldPrice);
        // Commit changes in original transaction
        await _context.SaveChangesAsync();
        // Publish integration event to the event bus
        // (RabbitMQ or a service bus underneath)
        _eventBus.Publish(@event);
        // ...
    }
    // ...
}

이 경우 원본 마이크로 서비스는 간단한 CRUD 마이크로 서비스이므로 해당 코드는 Web API 컨트롤러에 바로 배치됩니다.

CQRS 접근 방식을 사용하는 경우와 같이 고급 마이크로 서비스에서는 CommandHandler 클래스 안의 Handle() 메서드에서 구현할 수 있습니다.

이벤트 버스에 게시할 때 원자성과 복원력을 설계하기

이벤트 버스와 같은 분산 메시징 시스템을 통해 통합 이벤트를 게시하는 경우 원래 데이터베이스를 원자성으로 업데이트하고 이벤트를 게시하는 문제가 발생합니다(즉, 작업이 완료되거나 둘 다 완료되지 않음). 예를 들어 앞에서 보여 준 간소화된 예제에서 코드는 제품 가격이 변경될 때 데이터베이스에 데이터를 커밋한 다음 ProductPriceChangedIntegrationEvent 메시지를 게시합니다. 처음에는 이 두 작업이 꼭 동시에 이루어져야 할 것처럼 보일 수 있습니다. 그러나 데이터베이스 및 메시지 브로커와 관련된 분산 트랜잭션을 사용하는 경우 MSMQ(Microsoft Message Queuing)와 같은 이전 시스템에서와 마찬가지로 CAP 정리에 설명된 이유로 이 방법은 권장되지 않습니다.

기본적으로 마이크로 서비스를 사용하여 확장 가능하고 고가용성 시스템을 빌드합니다. CAP 정리를 약간 단순화하면, 지속적으로 사용 가능하고, 강한 일관성을 유지하며, 어떤 파티션에도 견딜 수 있는 (분산된) 데이터베이스(또는 모델을 소유한 마이크로서비스)를 구축할 수 없다는 것을 의미합니다. 이러한 세 가지 속성 중 두 가지를 선택해야 합니다.

마이크로 서비스 기반 아키텍처에서 가용성 및 허용 오차를 선택해야 하며 강력한 일관성을 강조하지 않아야 합니다. 따라서 대부분의 최신 마이크로 서비스 기반 애플리케이션에서는 MSMQ와 함께 Windows DTC(Distributed Transaction Coordinator)를 기반으로 분산 트랜잭션을 구현할 때와 마찬가지로 일반적으로 메시징에 분산 트랜잭션을 사용하지 않습니다.

초기 문제 및 해당 예제로 돌아가 보겠습니다. 데이터베이스가 업데이트된 후(이 경우 코드 _context.SaveChangesAsync()줄 바로 뒤) 서비스가 충돌하는 경우 통합 이벤트가 게시되기 전에 전체 시스템이 일관되지 않게 될 수 있습니다. 이 접근 방식은 처리 중인 특정 비즈니스 작업에 따라 비즈니스에 핵심적일 수 있습니다.

아키텍처 섹션의 앞부분에서 설명한 것처럼 이 문제를 처리하기 위한 몇 가지 방법을 사용할 수 있습니다.

  • 전체 이벤트 소싱 패턴 사용

  • 트랜잭션 로그 마이닝 사용.

  • Outbox 패턴을 사용합니다. 이 테이블은 통합 이벤트를 저장하는 트랜잭션 테이블입니다(로컬 트랜잭션 확장).

이 시나리오에서는 ES(전체 이벤트 소싱) 패턴을 사용하는 것이 가장 좋은 방법은 아니지만 가장 좋은 방법 중 하나입니다. 그러나 대부분의 애플리케이션 시나리오에서는 전체 ES 시스템을 구현하지 못할 수 있습니다. ES는 현재 상태 데이터를 저장하는 대신 트랜잭션 데이터베이스에 도메인 이벤트만 저장하는 것을 의미합니다. 도메인 이벤트만 저장하면 시스템의 기록을 사용할 수 있고 과거의 어느 순간에도 시스템 상태를 확인할 수 있는 등 큰 이점이 있을 수 있습니다. 그러나 전체 ES 시스템을 구현하려면 대부분의 시스템을 다시 구조화해야 하며 다른 많은 복잡성과 요구 사항을 도입해야 합니다. 예를 들어 이벤트 소싱을 위해 특별히 만들어진 데이터베이스 또는 Azure Cosmos DB, MongoDB, Cassandra, CouchDB 또는 RavenDB와 같은 문서 지향 데이터베이스를 사용하려고 합니다. ES는 이 문제에 대한 좋은 방법이지만 이벤트 소싱에 이미 익숙하지 않은 경우 가장 쉬운 솔루션은 아닙니다.

트랜잭션 로그 마이닝 사용 옵션은 처음에 투명해 보입니다. 그러나 이 방법을 사용하려면 마이크로 서비스를 SQL Server 트랜잭션 로그와 같은 RDBMS 트랜잭션 로그에 연결해야 합니다. 이 방법은 바람직하지 않을 수 있습니다. 또 다른 단점은 트랜잭션 로그에 기록된 하위 수준 업데이트가 상위 수준 통합 이벤트와 동일한 수준에 있지 않을 수 있다는 것입니다. 그렇다면 이러한 트랜잭션 로그 작업을 리버스 엔지니어링하는 프로세스는 어려울 수 있습니다.

균형 잡힌 접근 방식은 트랜잭션 데이터베이스 테이블과 간소화된 ES 패턴이 혼합된 것입니다. 통합 이벤트 테이블에 커밋할 때 원래 이벤트에서 설정한 "이벤트 게시 준비 완료"와 같은 상태를 사용할 수 있습니다. 그런 다음 이벤트 버스에 이벤트를 게시하려고 합니다. 게시 이벤트 작업이 성공하면 원본 서비스에서 다른 트랜잭션을 시작하고 상태를 "이벤트 게시 준비 완료"에서 "이미 게시된 이벤트"로 이동합니다.

이벤트 버스에서 게시-이벤트 작업이 실패하더라도 원본 마이크로서비스 내에서 데이터는 여전히 일관성을 유지하며 "이벤트 게시 준비 완료"로 표시됩니다. 나머지 서비스와 관련해서는 결국 일관성이 확보될 것입니다. 항상 백그라운드 작업에서 트랜잭션 또는 통합 이벤트의 상태를 확인할 수 있습니다. 작업이 "이벤트를 게시할 준비가 됨" 상태에서 이벤트를 찾으면 해당 이벤트를 이벤트 버스에 다시 게시할 수 있습니다.

이 방법을 사용하면 각 원본 마이크로 서비스에 대한 통합 이벤트만 유지하며 다른 마이크로 서비스 또는 외부 시스템과 통신하려는 이벤트만 유지합니다. 반면, 전체 ES 시스템에서는 모든 도메인 이벤트도 저장합니다.

따라서 이 균형 잡힌 접근 방식은 간소화된 ES 시스템입니다. 현재 상태("게시 준비 완료" 및 "게시됨")가 있는 통합 이벤트 목록이 필요합니다. 그러나 통합 이벤트에 대해 이러한 상태만 구현하면 됩니다. 또한 이 방법에서는 전체 ES 시스템에서와 마찬가지로 모든 도메인 데이터를 트랜잭션 데이터베이스에 이벤트로 저장할 필요가 없습니다.

관계형 데이터베이스를 이미 사용 중인 경우 트랜잭션 테이블을 사용하여 통합 이벤트를 저장할 수 있습니다. 애플리케이션에서 원자성을 얻으려면 로컬 트랜잭션을 기반으로 하는 2단계 프로세스를 사용합니다. 기본적으로 도메인 엔터티가 있는 동일한 데이터베이스에 IntegrationEvent 테이블이 있습니다. 이 테이블은 지속형 통합 이벤트를 도메인 데이터를 커밋하는 동일한 트랜잭션에 포함하도록 원자성을 달성하기 위한 보험으로 작동합니다.

프로세스는 다음과 같이 단계별로 진행됩니다.

  1. 애플리케이션이 로컬 데이터베이스 트랜잭션을 시작합니다.

  2. 그런 다음 도메인 엔터티의 상태를 업데이트하고 통합 이벤트 테이블에 이벤트를 삽입합니다.

  3. 마지막으로 트랜잭션을 커밋하므로 원하는 원자성을 얻게 됩니다.

  4. 이벤트를 어떻게든 게시합니다(다음).

이벤트를 게시하는 단계를 구현할 때 다음과 같은 선택 사항이 있습니다.

  • 트랜잭션을 커밋한 직후 통합 이벤트를 게시하고 다른 로컬 트랜잭션을 사용하여 테이블의 이벤트를 게시되는 것으로 표시합니다. 그런 다음, 테이블을 아티팩트로 사용하여 원격 마이크로 서비스에서 문제가 발생한 경우 통합 이벤트를 추적하고 저장된 통합 이벤트를 기반으로 보상 작업을 수행합니다.

  • 테이블을 일종의 큐로 사용합니다. 별도의 애플리케이션 스레드 또는 프로세스는 통합 이벤트 테이블을 쿼리하고, 이벤트를 이벤트 버스에 게시한 다음, 로컬 트랜잭션을 사용하여 이벤트를 게시됨으로 표시합니다.

그림 6-22에서는 이러한 방법 중 첫 번째 방법에 대한 아키텍처를 보여 줍니다.

작업자 마이크로 서비스 없이 게시할 때의 원자성 다이어그램입니다.

그림 6-22. 이벤트 버스에 이벤트를 게시할 때의 원자성

그림 6-22에 설명된 접근 방식에는 게시된 통합 이벤트의 성공 여부를 확인하고 확인하는 추가 작업자 마이크로 서비스가 없습니다. 오류가 발생하는 경우 추가 검사기 작업자 마이크로 서비스는 테이블에서 이벤트를 읽고 다시 게시할 수 있습니다. 즉, 2단계를 반복합니다.

두 번째 방법: EventLog 테이블을 큐로 사용하고 항상 작업자 마이크로 서비스를 사용하여 메시지를 게시합니다. 이 경우 프로세스는 그림 6-23과 같습니다. 그러면 추가 마이크로 서비스가 표시되고 이벤트를 게시할 때 테이블이 단일 원본입니다.

작업자 마이크로 서비스를 사용하여 게시할 때의 원자성 다이어그램입니다.

그림 6-23. 작업자 마이크로 서비스를 사용하여 이벤트 버스에 이벤트를 게시할 때의 원자성

간단히 하기 위해 eShopOnContainers 샘플은 첫 번째 방법(추가 프로세스 또는 검사기 마이크로 서비스 없음)과 이벤트 버스를 사용합니다. 그러나 eShopOnContainers 샘플은 가능한 모든 오류 사례를 처리하지 않습니다. 클라우드에 배포된 실제 애플리케이션에서는 결국 문제가 발생할 것이라는 사실을 수용해야 하며, 해당 확인 및 다시 보내기 논리를 구현해야 합니다. 이벤트 버스를 통해 (작업자와 함께) 게시할 때 해당 테이블을 이벤트의 단일 원본으로 사용하는 경우 테이블을 큐로 사용하는 것이 첫 번째 방법보다 더 효과적일 수 있습니다.

이벤트 버스를 통해 통합 이벤트를 게시할 때 원자성 구현

다음 코드에서는 여러 DbContext 개체를 포함하는 단일 트랜잭션을 만드는 방법(업데이트 중인 원래 데이터와 관련된 컨텍스트 하나 및 IntegrationEventLog 테이블과 관련된 두 번째 컨텍스트)을 만드는 방법을 보여 있습니다.

아래 예제 코드의 트랜잭션은 코드가 실행될 때 데이터베이스에 대한 연결에 문제가 있는 경우 복원력이 없습니다. 이는 서버 간에 데이터베이스를 이동할 수 있는 Azure SQL DB와 같은 클라우드 기반 시스템에서 발생할 수 있습니다. 여러 컨텍스트에서 복원력 있는 트랜잭션을 구현하려면 이 가이드의 뒷부 분에 있는 복원력 있는 Entity Framework Core SQL 연결 구현 섹션을 참조하세요.

명확성을 위해 다음 예제에서는 전체 프로세스를 단일 코드 조각으로 보여 줍니다. 그러나 eShopOnContainers 구현은 리팩터링되고 이 논리를 여러 클래스로 분할하므로 유지 관리가 더 쉽습니다.

// Update Product from the Catalog microservice
//
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem productToUpdate)
{
  var catalogItem =
       await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id ==
                                                               productToUpdate.Id);
  if (catalogItem == null) return NotFound();

  bool raiseProductPriceChangedEvent = false;
  IntegrationEvent priceChangedEvent = null;

  if (catalogItem.Price != productToUpdate.Price)
          raiseProductPriceChangedEvent = true;

  if (raiseProductPriceChangedEvent) // Create event if price has changed
  {
      var oldPrice = catalogItem.Price;
      priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id,
                                                                  productToUpdate.Price,
                                                                  oldPrice);
  }
  // Update current product
  catalogItem = productToUpdate;

  // Just save the updated product if the Product's Price hasn't changed.
  if (!raiseProductPriceChangedEvent)
  {
      await _catalogContext.SaveChangesAsync();
  }
  else  // Publish to event bus only if product price changed
  {
        // Achieving atomicity between original DB and the IntegrationEventLog
        // with a local transaction
        using (var transaction = _catalogContext.Database.BeginTransaction())
        {
           _catalogContext.CatalogItems.Update(catalogItem);
           await _catalogContext.SaveChangesAsync();

           await _integrationEventLogService.SaveEventAsync(priceChangedEvent);

           transaction.Commit();
        }

      // Publish the integration event through the event bus
      _eventBus.Publish(priceChangedEvent);

      _integrationEventLogService.MarkEventAsPublishedAsync(
                                                priceChangedEvent);
  }

  return Ok();
}

ProductPriceChangedIntegrationEvent 통합 이벤트를 만든 후 원래 도메인 작업(카탈로그 항목 업데이트)을 저장하는 트랜잭션에도 EventLog 테이블에 이벤트의 지속성이 포함됩니다. 이렇게 하면 단일 트랜잭션이 되며 이벤트 메시지가 전송되었는지 항상 확인할 수 있습니다.

이벤트 로그 테이블은 같은 데이터베이스에 대한 로컬 트랜잭션을 사용하여 원래 데이터베이스 작업과 함께 원자적으로 업데이트됩니다. 작업이 실패하면 예외가 throw되고 트랜잭션이 완료된 작업을 롤백하여 도메인 작업과 테이블에 저장된 이벤트 메시지 간의 일관성을 유지합니다.

구독에서 메시지 수신: 수신기 마이크로 서비스의 이벤트 처리기

이벤트 구독 논리 외에도 통합 이벤트 처리기(예: 콜백 메서드)에 대한 내부 코드를 구현해야 합니다. 이벤트 처리기는 특정 형식의 이벤트 메시지를 수신하고 처리할 위치를 지정하는 위치입니다.

이벤트 처리기는 먼저 이벤트 버스에서 이벤트 인스턴스를 받습니다. 그런 다음, 해당 통합 이벤트와 관련하여 처리할 구성 요소를 찾아 수신기 마이크로 서비스의 상태 변경으로 이벤트를 전파하고 유지합니다. 예를 들어 ProductPriceChanged 이벤트가 카탈로그 마이크로 서비스에서 발생하는 경우 다음 코드와 같이 장바구니 마이크로 서비스에서 처리되고 이 수신기 바구니 마이크로 서비스의 상태도 변경됩니다.

namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.EventHandling
{
    public class ProductPriceChangedIntegrationEventHandler :
        IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
    {
        private readonly IBasketRepository _repository;

        public ProductPriceChangedIntegrationEventHandler(
            IBasketRepository repository)
        {
            _repository = repository;
        }

        public async Task Handle(ProductPriceChangedIntegrationEvent @event)
        {
            var userIds = await _repository.GetUsers();
            foreach (var id in userIds)
            {
                var basket = await _repository.GetBasket(id);
                await UpdatePriceInBasketItems(@event.ProductId, @event.NewPrice, basket);
            }
        }

        private async Task UpdatePriceInBasketItems(int productId, decimal newPrice,
            CustomerBasket basket)
        {
            var itemsToUpdate = basket?.Items?.Where(x => int.Parse(x.ProductId) ==
                productId).ToList();
            if (itemsToUpdate != null)
            {
                foreach (var item in itemsToUpdate)
                {
                    if(item.UnitPrice != newPrice)
                    {
                        var originalPrice = item.UnitPrice;
                        item.UnitPrice = newPrice;
                        item.OldUnitPrice = originalPrice;
                    }
                }
                await _repository.UpdateBasket(basket);
            }
        }
    }
}

이벤트 처리기는 제품이 바스켓 인스턴스에 있는지 확인해야 합니다. 또한 각 관련 장바구니 품목의 항목 가격을 업데이트합니다. 마지막으로 그림 6-24와 같이 가격 변경에 대해 사용자에게 표시할 경고를 만듭니다.

사용자 카트의 가격 변경 알림을 보여 주는 브라우저의 스크린샷.

그림 6-24. 통합 이벤트에서 전달한 대로 장바구니에 항목 가격 변경 표시

업데이트 메시지 이벤트에서의 아이덴포턴시

업데이트 메시지 이벤트의 중요한 측면은 통신의 어느 지점에서든 오류가 발생하면 메시지를 다시 시도해야 한다는 것입니다. 그렇지 않으면 백그라운드 작업이 이미 게시된 이벤트를 다시 게시하려고 하여 경합 조건을 초래할 수 있습니다. 업데이트가 이중 실행 시에도 동일한 결과를 반환하는지(idempotent) 확인하십시오. 또한 중복을 식별하고 제거하며 하나의 응답만 다시 보내도록 충분한 정보를 제공하는지 확인하십시오.

앞에서 설명한 것처럼 idempotency는 결과를 변경하지 않고 작업을 여러 번 수행할 수 있음을 의미합니다. 메시징 환경에서 이벤트를 통신할 때와 마찬가지로 수신기 마이크로 서비스에 대한 결과를 변경하지 않고 여러 번 배달할 수 있는 경우 이벤트는 idempotent입니다. 이는 이벤트 자체의 특성 또는 시스템이 이벤트를 처리하는 방식 때문에 필요할 수 있습니다. 메시지의 멱등성은 이벤트 버스 패턴을 구현하는 애플리케이션뿐만 아니라 메시지를 사용하는 모든 애플리케이션에서 중요합니다.

idempotent 작업의 예는 해당 데이터가 테이블에 아직 없는 경우에만 테이블에 데이터를 삽입하는 SQL 문입니다. SQL 문을 삽입하는 횟수는 중요하지 않습니다. 결과는 동일합니다. 테이블에 해당 데이터가 포함됩니다. 메시지가 잠재적으로 전송되어 두 번 이상 처리될 수 있는 경우 메시지를 처리할 때도 이와 같은 Idempotency가 필요할 수 있습니다. 예를 들어, 재시도 로직으로 인해 발신자가 정확히 동일한 메시지를 여러 번 보내는 경우, 그 메시지가 idempotent, 즉 동일한 결과를 보장하는지 확인해야 합니다.

idempotent 메시지를 디자인할 수 있습니다. 예를 들어 "제품 가격에 $5를 추가하는" 대신 "제품 가격을 $25로 설정"하는 이벤트를 만들 수 있습니다. 첫 번째 메시지를 여러 번 안전하게 처리할 수 있으며 결과는 동일합니다. 두 번째 메시지에는 그렇지 않습니다. 그러나 첫 번째 경우에도 시스템에서 최신 가격 변경 이벤트를 보낼 수 있고 새 가격을 덮어쓸 수 있으므로 첫 번째 이벤트를 처리하지 않을 수 있습니다.

또 다른 예는 여러 구독자에게 전파되는 주문 완료 이벤트일 수 있습니다. 동일한 주문 완료 이벤트에 대해 중복된 메시지 이벤트가 있더라도 앱은 주문 정보가 다른 시스템에서 한 번만 업데이트되도록 해야 합니다.

각 이벤트가 수신기당 한 번만 처리되도록 하는 논리를 만들 수 있도록 이벤트당 일종의 ID를 사용하는 것이 편리합니다.

일부 메시지 처리는 본질적으로 아이덴포턴트입니다. 예를 들어 시스템에서 이미지 썸네일을 생성하는 경우 생성된 썸네일에 대한 메시지가 처리되는 횟수는 중요하지 않을 수 있습니다. 결과는 썸네일이 생성되고 매번 동일합니다. 한편, 신용카드 결제를 처리하기 위해 결제 게이트웨이를 호출하는 등의 작업은 전혀 멱등성이 없을 수도 있습니다. 이러한 경우 메시지를 여러 번 처리하면 예상한 효과가 있는지 확인해야 합니다.

추가 리소스

통합 이벤트 메시지 중복 제거

메시지 이벤트가 서로 다른 수준에서 구독자당 한 번만 전송되고 처리되는지 확인할 수 있습니다. 한 가지 방법은 사용 중인 메시징 인프라에서 제공하는 중복 제거 기능을 사용하는 것입니다. 또 다른 방법은 대상 마이크로 서비스에서 사용자 지정 논리를 구현하는 것입니다. 전송 수준과 애플리케이션 수준 모두에서 유효성 검사를 사용하는 것이 가장 좋습니다.

EventHandler 수준에서 메시지 이벤트 중복 제거

수신자가 이벤트를 한 번만 처리하도록 하는 한 가지 방법은 이벤트 처리기에서 메시지 이벤트를 처리할 때 특정 논리를 구현하는 것입니다. 예를 들어 통합 이벤트를 받을 UserCheckoutAcceptedIntegrationEvent에서 볼 수 있듯이 eShopOnContainers 애플리케이션에서 사용되는 방법입니다. (이 경우 CreateOrderCommandIdentifiedCommand를 식별자로 사용하여 eventMsg.RequestId를 감싼 후, 명령 처리기로 보냅니다.)

RabbitMQ를 사용할 때 메시지 중복 제거

간헐적인 네트워크 오류가 발생하면 메시지가 중복될 수 있으며 메시지 수신자는 이러한 중복된 메시지를 처리할 준비가 되어 있어야 합니다. 가능하다면 수신기는 중복 제거를 통해 명시적으로 처리하는 것보다 멱등 방식으로 메시지를 처리하는 것이 좋습니다.

RabbitMQ 설명서에 따르면 "메시지가 소비자에게 전달된 후 다시 큐에 추가된 경우(예: 소비자 연결이 끊어지기 전에 승인되지 않았기 때문에) RabbitMQ는 다시 배달될 때(동일한 소비자 또는 다른 소비자에게) 다시 배달된 플래그를 설정합니다.

"다시 배달된" 플래그가 설정된 경우 메시지가 이미 처리되었을 수 있으므로 수신자가 이를 고려해야 합니다. 그러나 그것은 보장되지 않습니다. 메시지는 네트워크 문제로 인해 메시지 브로커를 떠난 후 수신자가 도달하지 않았을 수 있습니다. 반면에 "다시 배달된" 플래그가 설정되지 않은 경우 메시지가 두 번 이상 전송되지 않은 것이 보장됩니다. 따라서 수신자는 메시지에 "재배포됨" 플래그가 설정된 경우에만 메시지를 중복 제거하거나 메시지를 멱등 방식으로 처리해야 합니다.

추가 리소스