Condividi tramite


Sottoscrizione di eventi

Suggerimento

Questo contenuto è un estratto dell'eBook, Architettura di microservizi .NET per applicazioni .NET containerizzati, disponibile in documentazione .NET o come PDF scaricabile gratuitamente leggibile offline.

Architettura di Microservizi .NET per Applicazioni .NET Containerizzate miniatura della copertina dell'eBook.

Il primo passaggio per l'uso del bus di eventi consiste nel sottoscrivere i microservizi agli eventi che vogliono ricevere. Tale funzionalità deve essere eseguita nei microservizi ricevitori.

Il codice semplice seguente illustra ciò che ogni microservizio ricevitore deve implementare all'avvio del servizio , ovvero nella Startup classe , in modo che sottoscriva gli eventi necessari. In questo caso, il basket-api microservizio deve iscriversi a ProductPriceChangedIntegrationEvent e OrderStartedIntegrationEvent messaggi.

Ad esempio, quando ci si iscrive all'evento ProductPriceChangedIntegrationEvent, il microservizio del carrello diventa consapevole delle eventuali modifiche al prezzo del prodotto e avvisa l'utente della modifica se quel prodotto si trova nel carrello dell'utente.

var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

eventBus.Subscribe<ProductPriceChangedIntegrationEvent,
                   ProductPriceChangedIntegrationEventHandler>();

eventBus.Subscribe<OrderStartedIntegrationEvent,
                   OrderStartedIntegrationEventHandler>();

Dopo l'esecuzione di questo codice, il microservizio abbonato sarà in ascolto attraverso i canali RabbitMQ. Quando arriva un messaggio di tipo ProductPriceChangedIntegrationEvent, il codice richiama il gestore eventi passato e elabora l'evento.

Pubblicazione di eventi tramite il bus di eventi

Infine, il mittente del messaggio (microservizio di origine) pubblica gli eventi di integrazione con codice simile all'esempio seguente. Questo approccio è un esempio semplificato che non tiene conto dell'atomicità. È necessario implementare codice simile ogni volta che un evento deve essere propagato tra più microservizi, in genere subito dopo il commit di dati o transazioni dal microservizio di origine.

In primo luogo, l'oggetto di implementazione del bus di eventi (basato su RabbitMQ o basato su un bus di servizio) viene inserito nel costruttore del controller, come nel codice seguente:

[Route("api/v1/[controller]")]
public class CatalogController : ControllerBase
{
    private readonly CatalogContext _context;
    private readonly IOptionsSnapshot<Settings> _settings;
    private readonly IEventBus _eventBus;

    public CatalogController(CatalogContext context,
        IOptionsSnapshot<Settings> settings,
        IEventBus eventBus)
    {
        _context = context;
        _settings = settings;
        _eventBus = eventBus;
    }
    // ...
}

Poi puoi usarlo nei metodi del controller, ad esempio nel metodo UpdateProduct:

[Route("items")]
[HttpPost]
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem product)
{
    var item = await _context.CatalogItems.SingleOrDefaultAsync(
        i => i.Id == product.Id);
    // ...
    if (item.Price != product.Price)
    {
        var oldPrice = item.Price;
        item.Price = product.Price;
        _context.CatalogItems.Update(item);
        var @event = new ProductPriceChangedIntegrationEvent(item.Id,
            item.Price,
            oldPrice);
        // Commit changes in original transaction
        await _context.SaveChangesAsync();
        // Publish integration event to the event bus
        // (RabbitMQ or a service bus underneath)
        _eventBus.Publish(@event);
        // ...
    }
    // ...
}

In questo caso, poiché il microservizio di origine è un semplice microservizio CRUD, tale codice viene inserito direttamente in un controller API Web.

Nei microservizi più avanzati, ad esempio quando si utilizzano approcci CQRS, può essere implementato nella classe CommandHandler, all'interno del metodo Handle().

Progettazione di atomicità e resilienza nella pubblicazione sul bus di eventi

Quando si pubblicano eventi di integrazione tramite un sistema di messaggistica distribuita come il bus di eventi, si verifica il problema di aggiornare in modo atomico il database originale e di pubblicare un evento, ovvero entrambe le operazioni vengono completate o nessuna di esse. Nell'esempio semplificato illustrato in precedenza, ad esempio, il codice esegue il commit dei dati nel database quando il prezzo del prodotto viene modificato e quindi pubblica un messaggio ProductPriceChangedIntegrationEvent. Inizialmente, potrebbe sembrare essenziale che queste due operazioni vengano eseguite in modo atomico. Tuttavia, se si usa una transazione distribuita che coinvolge il database e il broker di messaggi, come nei sistemi meno recenti come Microsoft Message Queuing (MSMQ), questo approccio non è consigliato per i motivi descritti dal teorema CAP.

In pratica, si usano i microservizi per creare sistemi scalabili e a disponibilità elevata. Semplificando in qualche modo, il teorema CAP indica che non è possibile creare un database (distribuito) o un microservizio proprietario del proprio modello, che è costantemente disponibile, coerente e tollerante a qualsiasi partizione. È necessario scegliere due di queste tre proprietà.

Nelle architetture basate su microservizi è consigliabile scegliere la disponibilità e la tolleranza e annullare l'accento sulla coerenza assoluta. Pertanto, nella maggior parte delle applicazioni moderne basate su microservizi, in genere non si vogliono usare transazioni distribuite nella messaggistica, come avviene quando si implementano transazioni distribuite basate su Windows Distributed Transaction Coordinator (DTC) con MSMQ.

Torniamo al problema iniziale e al relativo esempio. Se il servizio si arresta in modo anomalo dopo l'aggiornamento del database (in questo caso subito dopo la riga di codice con _context.SaveChangesAsync()), ma prima della pubblicazione dell'evento di integrazione, il sistema complessivo potrebbe diventare incoerente. Questo approccio potrebbe essere cruciale per l'azienda, a seconda dell'operazione aziendale specifica che si sta gestendo.

Come accennato in precedenza nella sezione relativa all'architettura, è possibile adottare diversi approcci per la gestione di questo problema:

  • Utilizzo del pattern di Event Sourcing completo.

  • Utilizzo del data mining dei log delle transazioni.

  • Uso del modello Outbox. Si tratta di una tabella transazionale per archiviare gli eventi di integrazione (estendendo la transazione locale).

Per questo scenario, l'uso del modello ES (Event Sourcing) completo è uno degli approcci migliori, se non è il migliore. In molti scenari di applicazione, tuttavia, potrebbe non essere possibile implementare un sistema ES completo. ES significa archiviare solo gli eventi di dominio nel database transazionale, anziché archiviare i dati di stato correnti. L'archiviazione solo degli eventi di dominio può avere grandi vantaggi, ad esempio la cronologia del sistema disponibile e la possibilità di determinare lo stato del sistema in qualsiasi momento in passato. Tuttavia, l'implementazione di un sistema ES completo richiede di riprogettare la maggior parte del sistema e introduce molte altre complessità e requisiti. Ad esempio, è consigliabile usare un database appositamente creato per l'origine eventi, ad esempio Archivio eventi o un database orientato ai documenti, ad esempio Azure Cosmos DB, MongoDB, Cassandra, CouchDB o RavenDB. ES è un ottimo approccio per questo problema, ma non è la soluzione più semplice a meno che non si abbia già familiarità con l'Event Sourcing.

L'opzione per utilizzare il data mining dei log delle transazioni inizialmente è trasparente. Tuttavia, per usare questo approccio, il microservizio deve essere associato al log delle transazioni RDBMS, ad esempio il log delle transazioni di SQL Server. Questo approccio probabilmente non è auspicabile. Un altro svantaggio è che gli aggiornamenti di basso livello registrati nel log delle transazioni potrebbero non essere allo stesso livello degli eventi di integrazione di alto livello. In tal caso, il processo di reverse engineering di tali operazioni del log delle transazioni può essere difficile.

Un approccio bilanciato è una combinazione di una tabella di database transazionale e di un modello ES semplificato. È possibile usare uno stato, ad esempio "pronto per pubblicare l'evento", impostato nell'evento originale quando viene eseguito il commit nella tabella degli eventi di integrazione. Si prova quindi a pubblicare l'evento sul bus degli eventi. Se l'azione publish-event ha esito positivo, avviare un'altra transazione nel servizio di origine e spostare lo stato da "pronto per pubblicare l'evento" in "evento già pubblicato".

Se l'azione publish-event nel bus di eventi ha esito negativo, i dati non saranno ancora incoerenti all'interno del microservizio di origine, ma sono ancora contrassegnati come "pronti per pubblicare l'evento" e per quanto riguarda il resto dei servizi, alla fine sarà coerente. È sempre possibile avere processi in background che controllano lo stato delle transazioni o degli eventi di integrazione. Se il lavoro trova un evento nello stato "pronto per pubblicare l'evento", può provare a ripubblicare l'evento nel bus di eventi.

Si noti che con questo approccio vengono mantenuti solo gli eventi di integrazione per ogni microservizio di origine e solo gli eventi che si desidera comunicare con altri microservizi o sistemi esterni. Al contrario, in un sistema ES completo, vengono archiviati anche tutti gli eventi di dominio.

Pertanto, questo approccio bilanciato è un sistema ES semplificato. È necessario un elenco di eventi di integrazione con il relativo stato corrente ("pronto per la pubblicazione" e "pubblicato"). È tuttavia necessario implementare questi stati solo per gli eventi di integrazione. In questo approccio non è necessario archiviare tutti i dati di dominio come eventi nel database transazionale, come in un sistema ES completo.

Se si usa già un database relazionale, è possibile usare una tabella transazionale per archiviare gli eventi di integrazione. Per ottenere l'atomicità nella tua applicazione, si utilizza un processo in due passaggi basato sulle transazioni locali. Fondamentalmente, si dispone di una tabella IntegrationEvent nello stesso database in cui sono presenti le entità di dominio. La tabella citata funziona come una garanzia per ottenere l'atomicità, consentendo di includere gli eventi di integrazione persistenti nelle stesse transazioni che eseguono il commit dei dati del dominio.

Passo per passo, il processo è simile al seguente:

  1. L'applicazione avvia una transazione di database locale.

  2. Aggiorna quindi lo stato delle entità di dominio e inserisce un evento nella tabella degli eventi di integrazione.

  3. Infine, esegue l'operazione di commit della transazione, in modo da ottenere l'atomicità desiderata e quindi

  4. Pubblicate l'evento in qualche maniera (successivo).

Quando si implementano i passaggi per la pubblicazione degli eventi, sono disponibili queste opzioni:

  • Pubblicare l'evento di integrazione subito dopo il commit della transazione e usare un'altra transazione locale per contrassegnare gli eventi nella tabella come pubblicati. Usare quindi la tabella come artefatto per tenere traccia degli eventi di integrazione in caso di problemi nei microservizi remoti ed eseguire azioni di compensazione in base agli eventi di integrazione archiviati.

  • Usare la tabella come una specie di coda. Un thread dell'applicazione o un processo separato interroga la tabella degli eventi di integrazione, pubblica gli eventi sull'event bus e quindi utilizza una transazione locale per contrassegnare gli eventi come pubblicati.

La figura 6-22 illustra l'architettura per il primo di questi approcci.

Diagramma dell'atomicità durante la pubblicazione senza un microservizio di lavoro.

Figura 6-22. Atomicità nella pubblicazione di eventi sull’autobus di eventi

L'approccio illustrato nella figura 6-22 non dispone di un microservizio di lavoro aggiuntivo responsabile del controllo e della conferma dell'esito positivo degli eventi di integrazione pubblicati. In caso di errore, il microservizio aggiuntivo del checker di lavoro può leggere gli eventi dalla tabella e ripubblicarli, ovvero ripetere il passaggio 2.

Informazioni sul secondo approccio: usare la tabella EventLog come coda e usare sempre un microservizio di lavoro per pubblicare i messaggi. In tal caso, il processo è simile a quello illustrato nella figura 6-23. Viene mostrato un microservizio aggiuntivo e la tabella è l'unica fonte nella pubblicazione degli eventi.

Diagramma dell'atomicità durante la pubblicazione con un microservizio di lavoro.

Figura 6-23. Atomicità nella pubblicazione di eventi sul bus degli eventi con un microservizio operatore

Per semplicità, l'esempio eShopOnContainers usa il primo approccio (senza processi aggiuntivi o microservizi di controllo) più il bus di eventi. Tuttavia, l'esempio eShopOnContainers non gestisce tutti i possibili casi di errore. In un'applicazione reale distribuita nel cloud, è necessario adottare il fatto che i problemi si verificheranno alla fine ed è necessario implementare tale logica di controllo e invio di nuovo. L'uso della tabella come coda può essere più efficace del primo approccio se si dispone di tale tabella come singola origine di eventi quando li si pubblica con il worker tramite il bus di eventi.

Implementazione dell'atomicità nella pubblicazione di eventi di integrazione attraverso il bus di eventi

Il codice seguente illustra come creare una singola transazione che coinvolge più oggetti DbContext, un contesto correlato ai dati originali da aggiornare e il secondo contesto correlato alla tabella IntegrationEventLog.

La transazione nel codice di esempio seguente non sarà resiliente se le connessioni al database presentano problemi al momento dell'esecuzione del codice. Questo problema può verificarsi in sistemi basati sul cloud come il database SQL di Azure, che potrebbe spostare i database tra server. Per l'implementazione di transazioni resilienti in più contesti, vedere la sezione Implementazione di connessioni SQL di Entity Framework Core resilienti più avanti in questa guida.

Per maggiore chiarezza, l'esempio seguente illustra l'intero processo in un'unica parte di codice. Tuttavia, l'implementazione di eShopOnContainers viene sottoposta a refactoring e suddivide questa logica in più classi, in modo da semplificare la gestione.

// Update Product from the Catalog microservice
//
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem productToUpdate)
{
  var catalogItem =
       await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id ==
                                                               productToUpdate.Id);
  if (catalogItem == null) return NotFound();

  bool raiseProductPriceChangedEvent = false;
  IntegrationEvent priceChangedEvent = null;

  if (catalogItem.Price != productToUpdate.Price)
          raiseProductPriceChangedEvent = true;

  if (raiseProductPriceChangedEvent) // Create event if price has changed
  {
      var oldPrice = catalogItem.Price;
      priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id,
                                                                  productToUpdate.Price,
                                                                  oldPrice);
  }
  // Update current product
  catalogItem = productToUpdate;

  // Just save the updated product if the Product's Price hasn't changed.
  if (!raiseProductPriceChangedEvent)
  {
      await _catalogContext.SaveChangesAsync();
  }
  else  // Publish to event bus only if product price changed
  {
        // Achieving atomicity between original DB and the IntegrationEventLog
        // with a local transaction
        using (var transaction = _catalogContext.Database.BeginTransaction())
        {
           _catalogContext.CatalogItems.Update(catalogItem);
           await _catalogContext.SaveChangesAsync();

           await _integrationEventLogService.SaveEventAsync(priceChangedEvent);

           transaction.Commit();
        }

      // Publish the integration event through the event bus
      _eventBus.Publish(priceChangedEvent);

      _integrationEventLogService.MarkEventAsPublishedAsync(
                                                priceChangedEvent);
  }

  return Ok();
}

Dopo aver creato l'evento di integrazione ProductPriceChangedIntegrationEvent, la transazione che archivia l'operazione di dominio originale (aggiornare l'elemento del catalogo) include anche la persistenza dell'evento nella tabella EventLog. In questo modo si tratta di una singola transazione e sarà sempre possibile controllare se i messaggi di evento sono stati inviati.

La tabella del registro eventi viene aggiornata in modo atomico con l'operazione di database originale, usando una transazione locale sullo stesso database. Se una delle operazioni ha esito negativo, viene generata un'eccezione e la transazione esegue il rollback di qualsiasi operazione completata, mantenendo quindi la coerenza tra le operazioni di dominio e i messaggi di evento salvati nella tabella.

Ricezione di messaggi dalle sottoscrizioni: gestori eventi nei microservizi ricevitori

Oltre alla logica di sottoscrizione di eventi, è necessario implementare il codice interno per i gestori eventi di integrazione, ad esempio un metodo di callback. Il gestore eventi è il percorso in cui si specifica dove verranno ricevuti ed elaborati i messaggi di evento di un determinato tipo.

Un gestore eventi riceve prima un'istanza di evento dal bus di eventi. Individua quindi il componente da elaborare in relazione all'evento di integrazione, propagando e salvando in modo permanente l'evento come modifica nello stato nel microservizio ricevitore. Ad esempio, se un evento ProductPriceChanged ha origine nel microservizio del catalogo, viene gestito nel microservizio basket e modifica lo stato in questo microservizio basket ricevitore, come illustrato nel codice seguente.

namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.EventHandling
{
    public class ProductPriceChangedIntegrationEventHandler :
        IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
    {
        private readonly IBasketRepository _repository;

        public ProductPriceChangedIntegrationEventHandler(
            IBasketRepository repository)
        {
            _repository = repository;
        }

        public async Task Handle(ProductPriceChangedIntegrationEvent @event)
        {
            var userIds = await _repository.GetUsers();
            foreach (var id in userIds)
            {
                var basket = await _repository.GetBasket(id);
                await UpdatePriceInBasketItems(@event.ProductId, @event.NewPrice, basket);
            }
        }

        private async Task UpdatePriceInBasketItems(int productId, decimal newPrice,
            CustomerBasket basket)
        {
            var itemsToUpdate = basket?.Items?.Where(x => int.Parse(x.ProductId) ==
                productId).ToList();
            if (itemsToUpdate != null)
            {
                foreach (var item in itemsToUpdate)
                {
                    if(item.UnitPrice != newPrice)
                    {
                        var originalPrice = item.UnitPrice;
                        item.UnitPrice = newPrice;
                        item.OldUnitPrice = originalPrice;
                    }
                }
                await _repository.UpdateBasket(basket);
            }
        }
    }
}

Il gestore eventi deve verificare se il prodotto esiste in una qualsiasi delle istanze del carrello. Aggiorna anche il prezzo dell'articolo per ogni voce di carrello correlata. Infine, crea un avviso da visualizzare all'utente sulla modifica del prezzo, come illustrato nella figura 6-24.

Screenshot di un browser che mostra la notifica di modifica dei prezzi nel carrello utente.

Figura 6-24. Visualizzazione di una variazione del prezzo dell'articolo in un carrello, come comunicato dagli eventi di integrazione

Idempotenza negli eventi dei messaggi di aggiornamento del sistema

Un aspetto importante degli eventi del messaggio di aggiornamento è che un errore in qualsiasi momento della comunicazione deve causare un nuovo tentativo del messaggio. In caso contrario, un'attività in background potrebbe provare a pubblicare un evento già pubblicato, creando una condizione di competizione. Assicurarsi che gli aggiornamenti siano idempotenti o che forniscano informazioni sufficienti per assicurarsi che sia possibile rilevare un duplicato, eliminarlo e inviare una sola risposta.

Come indicato in precedenza, l'idempotenza significa che un'operazione può essere eseguita più volte senza modificare il risultato. In un ambiente di messaggistica, come per la comunicazione degli eventi, un evento è idempotente se può essere recapitato più volte senza modificare il risultato per il microservizio ricevitore. Ciò può essere necessario a causa della natura dell'evento stesso o a causa del modo in cui il sistema gestisce l'evento. L'idempotenza dei messaggi è importante in qualsiasi applicazione che usa la messaggistica, non solo nelle applicazioni che implementano il modello di bus di eventi.

Un esempio di operazione idempotente è un'istruzione SQL che inserisce dati in una tabella solo se tali dati non sono già presenti nella tabella. Non importa quante volte si esegue l'istruzione SQL; il risultato sarà lo stesso: la tabella conterrà tali dati. L'idempotenza come questa può essere necessaria anche quando si gestiscono messaggi se i messaggi potrebbero essere potenzialmente inviati e quindi elaborati più volte. Ad esempio, se la logica di ripetizione dei tentativi fa sì che un mittente invii esattamente lo stesso messaggio più volte, è necessario assicurarsi che sia idempotente.

È possibile progettare messaggi idempotenti. Ad esempio, è possibile creare un evento che indica "impostare il prezzo del prodotto su $25" anziché "aggiungere $ 5 al prezzo del prodotto". È possibile elaborare il primo messaggio in modo sicuro un numero qualsiasi di volte e il risultato sarà lo stesso. Questo non è vero per il secondo messaggio. Ma anche nel primo caso, potresti non voler elaborare il primo evento, perché il sistema potrebbe anche aver inviato un evento di modifica del prezzo più recente e rischieresti di sovrascrivere il nuovo prezzo.

Un altro esempio potrebbe essere un evento di completamento dell'ordine propagato a più abbonati. L'app deve assicurarsi che le informazioni sull'ordine vengano aggiornate solo una volta in altri sistemi, anche se sono presenti eventi di messaggio duplicati per lo stesso evento completato dall'ordine.

È utile avere un tipo di identità per ogni evento in modo da poter creare logica che impone che ogni evento venga elaborato una sola volta per ogni ricevitore.

Alcune elaborazioni dei messaggi sono intrinsecamente idempotenti. Ad esempio, se un sistema genera anteprime di immagini, potrebbe non importare quante volte viene elaborato il messaggio relativo all'anteprima generata; il risultato è che le anteprime vengono generate e sono uguali ogni volta. D'altra parte, le operazioni come la chiamata di un gateway di pagamento per addebitare una carta di credito potrebbero non essere affatto idempotenti. In questi casi, è necessario assicurarsi che l'elaborazione di un messaggio più volte abbia l'effetto previsto.

Risorse aggiuntive

Deduplicazione dei messaggi di evento di integrazione

È possibile assicurarsi che gli eventi dei messaggi vengano inviati ed elaborati una sola volta per sottoscrittore a vari livelli. Un modo consiste nell'usare una funzionalità di deduplicazione offerta dall'infrastruttura di messaggistica in uso. Un altro consiste nell'implementare la logica personalizzata nel microservizio di destinazione. Avere convalide sia a livello di trasporto che a livello di applicazione è la scelta migliore.

Deduplicazione degli eventi di messaggio a livello di EventHandler

Un modo per assicurarsi che un evento venga elaborato una sola volta da qualsiasi ricevitore implementa una determinata logica durante l'elaborazione degli eventi del messaggio nei gestori eventi. Ad esempio, questo è l'approccio usato nell'applicazione eShopOnContainers, come si può vedere nel codice sorgente della classe UserCheckoutAcceptedIntegrationEventHandler quando riceve un UserCheckoutAcceptedIntegrationEvent evento di integrazione. In questo caso, CreateOrderCommand è avvolto con IdentifiedCommand, usando eventMsg.RequestId come identificatore, prima di inviarlo al gestore dei comandi.

Deduplicazione dei messaggi quando si usa RabbitMQ

Quando si verificano errori di rete intermittenti, è possibile duplicare i messaggi e il ricevitore di messaggi deve essere pronto per gestire questi messaggi duplicati. Se possibile, i ricevitori dovrebbero gestire i messaggi in modo idempotente, che è preferibile rispetto alla gestione esplicita mediante deduplicazione.

Secondo la documentazione di RabbitMQ, "Se un messaggio viene consegnato a un consumer e quindi riaccodato (perché non è stato riconosciuto prima della caduta della connessione del consumer, ad esempio), RabbitMQ imposterà il flag di nuovo invio quando viene consegnato di nuovo (sia allo stesso consumer che a uno diverso).

Se il flag "redelivered" è impostato, il ricevitore deve tenerne conto, perché il messaggio potrebbe essere già stato elaborato. Ma questo non è garantito; Il messaggio potrebbe non aver mai raggiunto il ricevitore dopo che ha lasciato il broker di messaggi, forse a causa di problemi di rete. D'altra parte, se il flag "rideliverato" non è impostato, è garantito che il messaggio non sia stato inviato più di una volta. Pertanto, il ricevitore deve deduplicare i messaggi o elaborare i messaggi in modo idempotente, solo se il segnalatore "riconsegnato" è impostato nel messaggio.

Risorse aggiuntive