Поделиться через


Подписка на события

Подсказка

Это фрагмент из электронной книги «Архитектура микрослужб .NET для контейнеризованных приложений .NET», доступной в документации .NET или в виде бесплатного скачиваемого PDF-файла, который можно прочитать в автономном режиме.

Архитектура микросервисов .NET для приложений .NET в контейнерах, миниатюра обложки электронной книги.

Первым шагом для использования шины событий — подписка микрослужб на события, которые они хотят принимать. Эта функция должна выполняться в микрослужбах приемника.

В следующем простом коде показано, что каждой микрослужбе, принимающей данные, необходимо реализовать при запуске службы (то есть в Startup классе), чтобы подписаться на события, которые ей нужны. В этом случае микрослужба basket-api должна подписаться на сообщения ProductPriceChangedIntegrationEvent и OrderStartedIntegrationEvent.

Например, при подписке на событие ProductPriceChangedIntegrationEvent, которое делает микросервис корзины осведомленным о любых изменениях цены на продукт, если этот продукт находится в корзине пользователя, и позволяет уведомлять пользователя об изменении.

var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

eventBus.Subscribe<ProductPriceChangedIntegrationEvent,
                   ProductPriceChangedIntegrationEventHandler>();

eventBus.Subscribe<OrderStartedIntegrationEvent,
                   OrderStartedIntegrationEventHandler>();

После выполнения этого кода микрослужба подписчика будет прослушивать каналы RabbitMQ. При поступлении любого сообщения типа ProductPriceChangedIntegrationEvent код вызывает обработчик событий, передаваемый в него, и обрабатывает событие.

Публикация событий через шину событий

Наконец, отправитель сообщений (микрослужба источника) публикует события интеграции с кодом, как показано в следующем примере. (Этот подход является упрощенным примером, который не учитывает атомарность.) Вы реализуете аналогичный код всякий раз, когда событие должно распространяться по нескольким микрослужбам, как правило, сразу после фиксации данных или транзакций из исходной микрослужбы.

Во-первых, объект реализации шины событий (на основе RabbitMQ или на основе служебной шины) будет внедрен в конструктор контроллера, как показано в следующем коде:

[Route("api/v1/[controller]")]
public class CatalogController : ControllerBase
{
    private readonly CatalogContext _context;
    private readonly IOptionsSnapshot<Settings> _settings;
    private readonly IEventBus _eventBus;

    public CatalogController(CatalogContext context,
        IOptionsSnapshot<Settings> settings,
        IEventBus eventBus)
    {
        _context = context;
        _settings = settings;
        _eventBus = eventBus;
    }
    // ...
}

Затем используйте его в методах вашего контроллера, например, в методе UpdateProduct:

[Route("items")]
[HttpPost]
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem product)
{
    var item = await _context.CatalogItems.SingleOrDefaultAsync(
        i => i.Id == product.Id);
    // ...
    if (item.Price != product.Price)
    {
        var oldPrice = item.Price;
        item.Price = product.Price;
        _context.CatalogItems.Update(item);
        var @event = new ProductPriceChangedIntegrationEvent(item.Id,
            item.Price,
            oldPrice);
        // Commit changes in original transaction
        await _context.SaveChangesAsync();
        // Publish integration event to the event bus
        // (RabbitMQ or a service bus underneath)
        _eventBus.Publish(@event);
        // ...
    }
    // ...
}

В этом случае, так как исходная микрослужба является простой микрослужбой CRUD, этот код помещается прямо в контроллер веб-API.

В более сложных микрослужбах, таких как микрослужбы, использующие подходы CQRS, реализация может быть выполнена в классе CommandHandler в рамках метода Handle().

Проектирование атомарности и устойчивости при публикации в шину событий

При публикации событий интеграции через распределённую систему обмена сообщениями, такую как шина событий, возникает проблема атомарного обновления исходной базы данных и публикации события (то есть либо обе операции должны быть завершены, либо ни одна из них). Например, в упрощенном примере, показанном ранее, код фиксирует данные в базу данных при изменении цены на продукт, а затем публикует сообщение ProductPriceChangedIntegrationEvent. Первоначально может показаться важным, что эти две операции выполняются атомарно. Однако если вы используете распределенную транзакцию, связанную с базой данных и брокером сообщений, как и в старых системах, таких как Microsoft Message Queuing (MSMQ), этот подход не рекомендуется по причинам, описанным в теореме CAP.

В основном микрослужбы используются для создания масштабируемых и высокодоступных систем. Упростив несколько, теорема CAP утверждает, что невозможно создать распределённую базу данных (или микросервис, который владеет своей моделью), которая будет одновременно постоянно доступной, строго консистентной и устойчива к любому сетевому разбиению. Необходимо выбрать два из этих трех свойств.

В архитектурах на основе микросервисов следует выбирать доступность и отказоустойчивость, и вы должны уменьшать акцент на строгую согласованность. Поэтому в большинстве современных приложений на основе микрослужб вы обычно не хотите использовать распределенные транзакции в системах обмена сообщениями, как это делается с использованием координатора распределенных транзакций Windows (DTC) и MSMQ.

Давайте вернемся к первоначальной проблеме и ее примеру. Если служба завершается сбоем после обновления базы данных (в данном случае сразу после строки кода с _context.SaveChangesAsync()), но до публикации события интеграции общая система может стать несогласованной. Этот подход может быть критически важным для бизнеса в зависимости от конкретной бизнес-операции, с которыми вы работаете.

Как упоминалось ранее в разделе архитектуры, вы можете использовать несколько подходов для решения этой проблемы:

  • Использование полного шаблона "Event Sourcing".

  • Использование майнинга журналов транзакций.

  • Использование шаблона "Outbox". Это транзакционная таблица для хранения событий интеграции (расширение локальной транзакции).

Для этого сценария использование полного шаблона источника событий (ES) является одним из лучших подходов, если не лучший. Однако во многих сценариях использования приложения вам может не получится реализовать полноценную систему ES. ES означает хранение только событий домена в базе данных транзакций вместо хранения текущих данных состояния. Хранение только событий домена может предложить значительные преимущества, такие как наличие доступной истории вашей системы и возможность определить состояние системы в любой момент в прошлом. Однако реализация полной системы ES требует повторной иерархии большей части системы и внедрения множества других сложностей и требований. Например, вы хотите использовать базу данных, специально созданную для обеспечения источника событий, таких как Хранилище событий, или базу данных, ориентированную на документ, например Azure Cosmos DB, MongoDB, Cassandra, CouchDB или RavenDB. ES — это отличный подход для этой проблемы, но не самое простое решение, если вы не знакомы с event sourcing.

Возможность использования майнинга журналов транзакций изначально выглядит прозрачной. Однако для использования этого подхода микрослужба должна быть сопряжена с журналом транзакций RDBMS, например журналом транзакций SQL Server. Этот подход, вероятно, не является желательным. Еще одним недостатком является то, что низкоуровневые обновления, записанные в журнале транзакций, могут не совпадать с высоким уровнем событий интеграции. В этом случае процесс обратной инженерии этих операций журнала транзакций может оказаться трудным.

Сбалансированный подход — это сочетание таблицы транзакций базы данных и упрощенного шаблона ES. Вы можете использовать состояние, например "готово к публикации события", которое устанавливается в исходном событии при фиксации его в таблице событий интеграции. Затем вы попытаетесь опубликовать событие в шине событий. Если действие публикации и события успешно выполнено, вы запускаете другую транзакцию в службе источника и перемещаете состояние из "готово к публикации события" в "событие уже опубликовано".

Если действие публикации события в шине событий завершается сбоем, данные в микросервисе-источнике всё равно останутся согласованными: они всё ещё помечены как "готовые к публикации события". Что касается остальных служб, то они в конечном итоге будут согласованными. Вы всегда можете запускать фоновые задачи для проверки состояния транзакций или событий интеграции. Если задание находит событие в состоянии "готовности к публикации события", оно может попытаться повторно опубликовать это событие в шине событий.

Обратите внимание, что с помощью этого подхода вы сохраняете только события интеграции для каждой микрослужбы-источника, а также только те события, которые вы хотите сообщить другим микрослужбам или внешним системам. В отличие от этого, в полной системе ES вы также храните все события домена.

Таким образом, этот сбалансированный подход является упрощенной системой ES. Вам нужен список событий интеграции с текущим состоянием ("готово к публикации" и "опубликовано"). Но эти состояния необходимо реализовать только для событий интеграции. И в этом подходе вам не нужно хранить все данные домена в виде событий в транзакционной базе данных, как и в полной системе ES.

Если вы уже используете реляционную базу данных, можно использовать транзакционные таблицы для хранения событий интеграции. Чтобы добиться атомарности в приложении, используйте двухэтапный процесс на основе локальных транзакций. В основном у вас есть таблица IntegrationEvent в той же базе данных, где у вас есть сущности домена. Эта таблица работает в качестве страхования для достижения атомарности, чтобы включить сохраненные события интеграции в те же транзакции, которые фиксируют данные домена.

Пошаговый процесс выглядит следующим образом:

  1. Приложение начинает локальную транзакцию базы данных.

  2. Затем он обновляет состояние сущностей домена и вставляет событие в таблицу событий интеграции.

  3. После этого он фиксирует транзакцию, чтобы вы получили нужную атомарность, а затем

  4. Вы публикуете событие как-то (далее).

При реализации шагов публикации событий у вас есть следующие варианты:

  • Опубликуйте событие интеграции сразу после фиксации транзакции и используйте другую локальную транзакцию, чтобы пометить события в таблице как опубликованные. Затем используйте таблицу так же, как артефакт для отслеживания событий интеграции в случае проблем в удаленных микрослужбах и выполнения компенсирующих действий на основе сохраненных событий интеграции.

  • Используйте таблицу в качестве типа очереди. Отдельный поток приложения или процесс запрашивает таблицу событий интеграции, публикует события в шине событий, а затем использует локальную транзакцию, чтобы пометить события как опубликованные.

На рисунке 6–22 показана архитектура первого из этих подходов.

Схема атомарности при публикации без рабочей микрослужбы.

Рис. 6–22. Атомарность для публикации событий в шине обмена событиями

На рисунке 6-22 отсутствует дополнительная микрослужба-воркер, которая отвечает за проверку и подтверждение успешного завершения опубликованных событий интеграции. В случае сбоя дополнительная проверочная микрослужба может считывать события из таблицы и снова публиковать их, то есть повторять шаг номер 2.

О втором подходе: вы используете таблицу EventLog в качестве очереди и всегда используете рабочую микрослужбу для публикации сообщений. В этом случае процесс похож на рисунок 6–23. В этом примере показана дополнительная микрослужба, а таблица — единственный источник при публикации событий.

Схема атомарности при публикации с помощью рабочего микросервиса.

Рис. 6–23. Атомарность при публикации событий в шине событий с помощью рабочей микрослужбы

Для простоты в примере eShopOnContainers используется первый подход (без дополнительных процессов или микрослужб проверки), а также шина событий. Однако пример eShopOnContainers не обрабатывает все возможные случаи сбоя. В реальном приложении, развернутом в облаке, необходимо принять тот факт, что проблемы возникнут в конечном итоге, и необходимо реализовать логику проверки и повторного отправления. Использование таблицы в качестве очереди может быть более эффективным, чем первый подход, если вы используете эту таблицу как единый источник событий при их публикации с помощью рабочего процесса через систему передачи событий.

Реализация атомарности при публикации событий интеграции через шину событий

В следующем коде показано, как создать одну транзакцию с несколькими объектами DbContext— один контекст, связанный с обновляемыми исходными данными, и второй контекст, связанный с таблицей IntegrationEventLog.

Транзакция в приведенном ниже примере кода не будет устойчивой, если подключения к базе данных имеют какие-либо проблемы во время выполнения кода. Это может произойти в облачных системах, таких как база данных SQL Azure, которая может перемещать базы данных между серверами. Сведения о реализации устойчивых транзакций в нескольких контекстах см. в разделе "Реализация устойчивых подключений Entity Framework Core SQL " далее в этом руководстве.

Для ясности в следующем примере показан весь процесс в одном фрагменте кода. Однако реализация eShopOnContainers рефакторингируется и разделяет эту логику на несколько классов, чтобы упростить обслуживание.

// Update Product from the Catalog microservice
//
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem productToUpdate)
{
  var catalogItem =
       await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id ==
                                                               productToUpdate.Id);
  if (catalogItem == null) return NotFound();

  bool raiseProductPriceChangedEvent = false;
  IntegrationEvent priceChangedEvent = null;

  if (catalogItem.Price != productToUpdate.Price)
          raiseProductPriceChangedEvent = true;

  if (raiseProductPriceChangedEvent) // Create event if price has changed
  {
      var oldPrice = catalogItem.Price;
      priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id,
                                                                  productToUpdate.Price,
                                                                  oldPrice);
  }
  // Update current product
  catalogItem = productToUpdate;

  // Just save the updated product if the Product's Price hasn't changed.
  if (!raiseProductPriceChangedEvent)
  {
      await _catalogContext.SaveChangesAsync();
  }
  else  // Publish to event bus only if product price changed
  {
        // Achieving atomicity between original DB and the IntegrationEventLog
        // with a local transaction
        using (var transaction = _catalogContext.Database.BeginTransaction())
        {
           _catalogContext.CatalogItems.Update(catalogItem);
           await _catalogContext.SaveChangesAsync();

           await _integrationEventLogService.SaveEventAsync(priceChangedEvent);

           transaction.Commit();
        }

      // Publish the integration event through the event bus
      _eventBus.Publish(priceChangedEvent);

      _integrationEventLogService.MarkEventAsPublishedAsync(
                                                priceChangedEvent);
  }

  return Ok();
}

После создания события интеграции ProductPriceChangedIntegrationEvent транзакция, которая хранит исходную операцию домена (обновление элемента каталога) также включает сохранение события в таблице EventLog. Это делает ее одной транзакцией, и вы всегда сможете проверить, были ли отправлены сообщения о событиях.

Таблица журнала событий обновляется атомарно с помощью исходной операции базы данных, используя локальную транзакцию для той же базы данных. При сбое любой из операций создается исключение, и транзакция откатывает любую завершенную операцию, таким образом сохраняя согласованность между операциями домена и сообщениями о событиях, сохраненными в таблице.

Получение сообщений из подписок: обработчики событий в микрослужбах приемника

Помимо логики подписки на события, необходимо реализовать внутренний код для обработчиков событий интеграции (например, метода обратного вызова). Обработчик событий определяет, где будут получены и обработаны сообщения о событии определенного типа.

Обработчик событий сначала получает экземпляр события из шины событий. Затем он определяет компонент, который нужно обработать в связи с данным событием интеграции, распространяя и сохраняя событие как изменение состояния в микрослужбе получателя. Например, если событие ProductPriceChanged возникает в микрослужбе каталога, оно обрабатывается в микрослужбе корзины и изменяет состояние микрослужбы корзины приемника, как показано в следующем коде.

namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.EventHandling
{
    public class ProductPriceChangedIntegrationEventHandler :
        IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
    {
        private readonly IBasketRepository _repository;

        public ProductPriceChangedIntegrationEventHandler(
            IBasketRepository repository)
        {
            _repository = repository;
        }

        public async Task Handle(ProductPriceChangedIntegrationEvent @event)
        {
            var userIds = await _repository.GetUsers();
            foreach (var id in userIds)
            {
                var basket = await _repository.GetBasket(id);
                await UpdatePriceInBasketItems(@event.ProductId, @event.NewPrice, basket);
            }
        }

        private async Task UpdatePriceInBasketItems(int productId, decimal newPrice,
            CustomerBasket basket)
        {
            var itemsToUpdate = basket?.Items?.Where(x => int.Parse(x.ProductId) ==
                productId).ToList();
            if (itemsToUpdate != null)
            {
                foreach (var item in itemsToUpdate)
                {
                    if(item.UnitPrice != newPrice)
                    {
                        var originalPrice = item.UnitPrice;
                        item.UnitPrice = newPrice;
                        item.OldUnitPrice = originalPrice;
                    }
                }
                await _repository.UpdateBasket(basket);
            }
        }
    }
}

Обработчик событий должен проверить, существует ли продукт в любом экземпляре корзины. Он также обновляет цену товара для каждой соответствующей позиции в корзине. Наконец, он создает оповещение для отображения пользователю об изменении цены, как показано на рис. 6-24.

Снимок экрана: браузер с уведомлением об изменении цены в корзине пользователя.

Рис. 6–24. Отображение изменения цены на товар в корзине, как это предусмотрено событиями интеграции

Идемпотентность в событиях обновления сообщений

Важный аспект событий сообщения об обновлении заключается в том, что сбой в любой момент связи должен привести к повторной попытке отправки сообщения. В противном случае фоновая задача может попытаться опубликовать событие, которое уже опубликовано, создав условие гонки. Убедитесь, что обновления являются идемпотентными или предоставляют достаточно информации, чтобы убедиться, что можно обнаружить дубликаты, отменить его и отправить только один ответ.

Как отмечалось ранее, идемпотентность означает, что операция может выполняться несколько раз, не изменяя результат. В среде обмена сообщениями, как в случае передачи событий, событие идемпотентно, если оно может быть доставлено несколько раз без изменения результата для микрослужбы получателя. Это может потребоваться из-за характера самого события или из-за того, как система обрабатывает событие. Идемпотентность сообщений важна в любом приложении, использующего обмен сообщениями, а не только в приложениях, реализующих шаблон шины событий.

Примером идемпотентной операции является инструкция SQL, которая вставляет данные в таблицу только в том случае, если эти данные еще не в таблице. Не имеет значения, сколько раз выполняется инструкция SQL; Результат будет таким же: таблица будет содержать эти данные. Идемпотентность может быть необходима при работе с сообщениями, если они могут быть отправлены и обработаны более одного раза. Например, если логика повторных попыток приводит к тому, что отправитель отправляет точно такое же сообщение более одного раза, необходимо убедиться, что операция является идемпотентной.

Можно разработать идемпотентные сообщения. Например, можно создать событие, которое говорит,что "задайте цену продукта на $25", а не "добавить $ 5 к цене продукта". Вы можете безопасно обработать первое сообщение любое количество раз, и результат будет одинаковым. Это не верно для второго сообщения. Но даже в первом случае может не потребоваться обработать первое событие, так как система могла бы также отправить новое событие изменения цен, и вы будете перезаписывать новую цену.

Другим примером может быть событие завершения заказа, которое распространяется на нескольких подписчиков. Приложение должно убедиться, что сведения о заказе обновляются в других системах только один раз, даже если для того же события завершения заказа дублируются сообщения.

Важно иметь некую идентификацию для каждого события, чтобы создать логику, обеспечивающую обработку каждого события только один раз для каждого получателя.

Некоторые процессы обработки сообщений по своей природе являются идемпотентными. Например, если система создает эскизы изображений, может не иметь значения, сколько раз обрабатывается сообщение о созданном эскизе; Результатом является то, что эскизы создаются, и они одинаковы каждый раз. С другой стороны, такие операции, как вызов шлюза платежей для оплаты кредитной карты, могут не быть идемпотентными вообще. В таких случаях необходимо убедиться, что обработка сообщения несколько раз влияет на ожидаемый результат.

Дополнительные ресурсы

Дедупликация сообщений о событиях интеграции

Вы можете убедиться, что сообщения событий отправляются и обрабатываются только один раз для каждого подписчика на различных уровнях. Одним из способов является использование функции дедупликации, предлагаемой инфраструктурой обмена сообщениями, которую вы используете. Другой вариант — реализовать уникальную логику в целевой микрослужбе. Проверка как на уровне транспорта, так и на уровне приложения — это наиболее подходящий вариант.

Дедупликация событий сообщений на уровне EventHandler

Один из способов убедиться, что событие обрабатывается только один раз любым получателем, реализуя определенную логику при обработке событий сообщения в обработчиках событий. Например, это подход, используемый в приложении eShopOnContainers, как показано в исходном коде класса UserCheckoutAcceptedIntegrationEventHandler при получении UserCheckoutAcceptedIntegrationEvent события интеграции. (В этом случае CreateOrderCommand обернут в IdentifiedCommand, используя eventMsg.RequestId как идентификатор, перед отправкой в обработчик команд).

Дедупликация сообщений при использовании RabbitMQ

При периодических сбоях сети сообщения могут дублироваться, а получатель сообщений должен быть готов к обработке этих повторяющихся сообщений. По возможности получатели должны обрабатывать сообщения в идемпотентном режиме, что лучше, чем явно обрабатывать их с дедупликацией.

Согласно документации RabbitMQ: "Если сообщение доставлено потребителю и затем поставлено обратно в очередь (например, потому что не было подтверждено до отключения подключения потребителя), то RabbitMQ установит флаг повторной доставки при повторной доставке (независимо от того, будет ли это тот же потребитель или другой).

Если установлен флаг "повторная доставка", получатель должен учитывать это, так как сообщение уже могло быть обработано. Но это не гарантируется; Сообщение, возможно, никогда не достигло получателя после того, как он покинул брокер сообщений, возможно, из-за проблем с сетью. С другой стороны, если флаг "повторно доставлено" не задан, гарантируется, что сообщение не было отправлено более одного раза. Таким образом, получатель должен удалять дубликаты сообщений или обрабатывать сообщения в идемпотентном режиме обработки только в том случае, если в сообщении установлен флаг "повторной доставки".

Дополнительные ресурсы