Berichtgestuurde bedrijfstoepassingen bouwen met NServiceBus en Azure Service Bus

NServiceBus is een commercieel berichtenframework dat wordt geleverd door Specifieke Software. Het is gebouwd op basis van Azure Service Bus en helpt ontwikkelaars zich te concentreren op bedrijfslogica door infrastructuurproblemen te abstraheren. In deze handleiding bouwen we een oplossing waarmee berichten tussen twee services worden uitgewisseld. We laten ook zien hoe u automatisch mislukte berichten opnieuw kunt proberen en de opties voor het hosten van deze services in Azure kunt bekijken.

Notitie

De code voor deze zelfstudie is beschikbaar op de website specifieke softwaredocumenten.

Vereisten

In het voorbeeld wordt ervan uitgegaan dat u een Azure Service Bus naamruimte hebt gemaakt.

Belangrijk

NServiceBus vereist ten minste de Standard-laag. De Basic-laag werkt niet.

De oplossing downloaden en voorbereiden

  1. Download de code van de website specifieke softwaredocumenten. De oplossing SendReceiveWithNservicebus.sln bestaat uit drie projecten:

    • Afzender: een consoletoepassing die berichten verzendt
    • Ontvanger: een consoletoepassing die berichten van de afzender ontvangt en antwoordt
    • Gedeeld: een klassebibliotheek met de berichtcontracten die worden gedeeld tussen de afzender en ontvanger

    Het volgende diagram, gegenereerd door ServiceInsight, een hulpprogramma voor visualisatie en foutopsporing van bepaalde software, toont de berichtenstroom:

    Afbeelding van het sequentiediagram

  2. Open SendReceiveWithNservicebus.sln in uw favoriete code-editor (bijvoorbeeld Visual Studio 2019).

  3. Open appsettings.json in zowel het ontvanger- als het afzenderproject en stel AzureServiceBusConnectionString in op de connection string voor uw Azure Service Bus naamruimte.

De contracten voor gedeelde berichten definiëren

In de gedeelde klassebibliotheek definieert u de contracten die worden gebruikt om onze berichten te verzenden. Het bevat een verwijzing naar het NServiceBus NuGet-pakket, dat interfaces bevat die u kunt gebruiken om onze berichten te identificeren. De interfaces zijn niet vereist, maar ze geven ons wat extra validatie van NServiceBus en maken het mogelijk dat de code zelfdocumenterend is.

Eerst beoordelen we de Ping.cs klas

public class Ping : NServiceBus.ICommand
{
    public int Round { get; set; }
}

De Ping klasse definieert een bericht dat de afzender naar de ontvanger verzendt. Het is een eenvoudige C#-klasse die een interface uit het NServiceBus-pakket implementeert NServiceBus.ICommand. Dit bericht is een signaal aan de lezer en aan NServiceBus dat het een opdracht is, hoewel er andere manieren zijn om berichten te identificeren zonder gebruik te maken van interfaces.

De andere berichtklasse in de gedeelde projecten is Pong.cs:

public class Pong : NServiceBus.IMessage
{
    public string Acknowledgement { get; set; }
}

Pong is ook een eenvoudig C#-object, hoewel hiermee wordt geïmplementeerd NServiceBus.IMessage. De IMessage interface vertegenwoordigt een algemeen bericht dat geen opdracht of gebeurtenis is en dat vaak wordt gebruikt voor antwoorden. In ons voorbeeld is het een antwoord dat de ontvanger terugstuurt naar de afzender om aan te geven dat er een bericht is ontvangen.

De Ping en Pong zijn de twee berichttypen die u gaat gebruiken. De volgende stap is het configureren van de afzender om Azure Service Bus te gebruiken en een Ping bericht te verzenden.

De afzender instellen

De afzender is een eindpunt dat ons Ping bericht verzendt. Hier configureert u de afzender om Azure Service Bus te gebruiken als transportmechanisme. Vervolgens maakt u een Ping exemplaar en verzendt u deze.

In de Main methode van Program.csconfigureert u het eindpunt Afzender:

var host = Host.CreateDefaultBuilder(args)
    // Configure a host for the endpoint
    .ConfigureLogging((context, logging) =>
    {
        logging.AddConfiguration(context.Configuration.GetSection("Logging"));

        logging.AddConsole();
    })
    .UseConsoleLifetime()
    .UseNServiceBus(context =>
    {
        // Configure the NServiceBus endpoint
        var endpointConfiguration = new EndpointConfiguration("Sender");

        var transport = endpointConfiguration.UseTransport<AzureServiceBusTransport>();
        var connectionString = context.Configuration.GetConnectionString("AzureServiceBusConnectionString");
        transport.ConnectionString(connectionString);

        transport.Routing().RouteToEndpoint(typeof(Ping), "Receiver");

        endpointConfiguration.EnableInstallers();
        endpointConfiguration.AuditProcessedMessagesTo("audit");

        return endpointConfiguration;
    })
    .ConfigureServices(services => services.AddHostedService<SenderWorker>())
    .Build();

await host.RunAsync();

Er is hier veel om uit te pakken, dus we bekijken het stap voor stap.

Een host voor het eindpunt configureren

Hosting en logboekregistratie worden geconfigureerd met behulp van standaardopties voor Microsoft Generic Host. Voorlopig is het eindpunt geconfigureerd om te worden uitgevoerd als een consoletoepassing, maar het kan worden gewijzigd om te worden uitgevoerd in Azure Functions met minimale wijzigingen, die verderop in dit artikel worden besproken.

Het NServiceBus-eindpunt configureren

Vervolgens vertelt u de host om NServiceBus te gebruiken met de .UseNServiceBus(…) extensiemethode. De methode gebruikt een callback-functie die een eindpunt retourneert dat wordt gestart wanneer de host wordt uitgevoerd.

In de eindpuntconfiguratie geeft AzureServiceBus u op voor ons transport, waarbij u een connection string van appsettings.jsonopgeeft. Vervolgens stelt u de routering zo in dat berichten van het type Ping worden verzonden naar een eindpunt met de naam 'Ontvanger'. Hiermee kan NServiceBus het proces van verzending van het bericht naar de bestemming automatiseren zonder het adres van de ontvanger te vereisen.

De aanroep naar EnableInstallers stelt de topologie in de Azure Service Bus naamruimte in wanneer het eindpunt wordt gestart, waarbij waar nodig de vereiste wachtrijen worden gemaakt. In productieomgevingen is operationele scripting een andere optie om de topologie te maken.

Achtergrondservice instellen voor het verzenden van berichten

Het laatste deel van de afzender is SenderWorker, een achtergrondservice die is geconfigureerd om elke seconde een Ping bericht te verzenden.

public class SenderWorker : BackgroundService
{
    private readonly IMessageSession messageSession;
    private readonly ILogger<SenderWorker> logger;

    public SenderWorker(IMessageSession messageSession, ILogger<SenderWorker> logger)
    {
        this.messageSession = messageSession;
        this.logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            var round = 0;
            while (!stoppingToken.IsCancellationRequested)
            {
                await messageSession.Send(new Ping { Round = round++ })
                    .ConfigureAwait(false);

                logger.LogInformation($"Message #{round}");

                await Task.Delay(1_000, stoppingToken)
                    .ConfigureAwait(false);
            }
        }
        catch (OperationCanceledException)
        {
            // graceful shutdown
        }
    }
}

De IMessageSession gebruikt in wordt geïnjecteerd in en stelt ons in ExecuteAsyncSenderWorker staat om berichten te verzenden met behulp van NServiceBus buiten een berichtenhandler. De routering die u hebt geconfigureerd in Sender geeft de bestemming van de Ping berichten op. De topologie van het systeem (welke berichten naar welke adressen worden doorgestuurd) wordt gescheiden van de bedrijfscode.

De afzendertoepassing bevat ook een PongHandler. U keert terug nadat we de ontvanger hebben besproken. Dit doen we vervolgens.

De ontvanger instellen

De ontvanger is een eindpunt dat luistert naar een Ping bericht, registreert wanneer een bericht wordt ontvangen en antwoordt naar de afzender. In deze sectie bekijken we snel de eindpuntconfiguratie, die vergelijkbaar is met de afzender, en richten we onze aandacht vervolgens op de berichtenhandler.

Stel de ontvanger net als de afzender in als een consoletoepassing met behulp van de Algemene Host van Microsoft. Er wordt dezelfde configuratie voor logboekregistratie en eindpunten gebruikt (met Azure Service Bus als het berichttransport), maar met een andere naam, om deze te onderscheiden van de afzender:

var endpointConfiguration = new EndpointConfiguration("Receiver");

Omdat dit eindpunt alleen antwoordt op de afzender en geen nieuwe gesprekken start, is er geen routeringsconfiguratie vereist. Er is ook geen achtergrondmedewerker nodig, zoals de afzender, omdat deze alleen antwoordt wanneer het een bericht ontvangt.

De berichtenhandler Ping

Het ontvangerproject bevat een berichtenhandler met de naam PingHandler:

public class PingHandler : NServiceBus.IHandleMessages<Ping>
{
    private readonly ILogger<PingHandler> logger;

    public PingHandler(ILogger<PingHandler> logger)
    {
        this.logger = logger;
    }

    public async Task Handle(Ping message, IMessageHandlerContext context)
    {
        logger.LogInformation($"Processing Ping message #{message.Round}");

        // throw new Exception("BOOM");

        var reply = new Pong { Acknowledgement = $"Ping #{message.Round} processed at {DateTimeOffset.UtcNow:s}" };

        await context.Reply(reply);
    }
}

Laten we de becommentarieerde code voorlopig negeren; we komen er later op terug wanneer we het hebben over het herstellen van een fout.

De klasse implementeert IHandleMessages<Ping>, waarmee één methode wordt gedefinieerd: Handle. Deze interface vertelt NServiceBus dat wanneer het eindpunt een bericht van het type Pingontvangt, het moet worden verwerkt door de Handle methode in deze handler. De Handle methode gebruikt het bericht zelf als een parameter en een IMessageHandlerContext, waarmee verdere berichtbewerkingen mogelijk zijn, zoals beantwoorden, opdrachten verzenden of gebeurtenissen publiceren.

Onze PingHandler is eenvoudig: wanneer een Ping bericht wordt ontvangen, logt u de berichtgegevens in en beantwoordt u de afzender met een nieuw Pong bericht.

Notitie

In de configuratie van de afzender hebt u opgegeven dat Ping berichten moeten worden doorgestuurd naar de ontvanger. NServiceBus voegt metagegevens toe aan de berichten die onder andere de oorsprong van het bericht aangeven. Daarom hoeft u geen routeringsgegevens op te geven voor het Pong antwoordbericht. Het bericht wordt automatisch teruggeleid naar de oorsprong: de afzender.

Nu de afzender en ontvanger correct zijn geconfigureerd, kunt u de oplossing uitvoeren.

De oplossing uitvoeren

Als u de oplossing wilt starten, moet u zowel de afzender als de ontvanger uitvoeren. Als u Visual Studio Code gebruikt, start u de configuratie 'Foutopsporing allemaal'. Als u Visual Studio gebruikt, configureert u de oplossing om zowel het afzender- als het ontvangerproject te starten:

  1. Klik met de rechtermuisknop op de oplossing in Solution Explorer
  2. Selecteer Opstartprojecten instellen...
  3. Selecteer Meerdere opstartprojecten
  4. Voor zowel de afzender als de ontvanger selecteert u 'Start' in de vervolgkeuzelijst

Start de oplossing. Er worden twee consoletoepassingen weergegeven, één voor de afzender en één voor de ontvanger.

In de afzender ziet u dat er elke seconde een Ping bericht wordt verzonden, dankzij de SenderWorker achtergrondtaak. De ontvanger geeft de details weer van elk Ping bericht dat het ontvangt en de afzender registreert de details van elk Pong bericht dat het ontvangt als antwoord.

Nu alles werkt, gaan we het verbreken.

Tolerantie in actie

Fouten zijn een feit van het leven in softwaresystemen. Het is onvermijdelijk dat code mislukt en dit kan verschillende oorzaken hebben, zoals netwerkfouten, databasevergrendelingen, wijzigingen in een API van derden en gewone oude coderingsfouten.

NServiceBus heeft robuuste herstelfuncties voor het afhandelen van fouten. Wanneer een berichtenhandler mislukt, worden berichten automatisch opnieuw geprobeerd op basis van een vooraf gedefinieerd beleid. Er zijn twee soorten beleid voor nieuwe pogingen: onmiddellijke nieuwe pogingen en vertraagde nieuwe pogingen. De beste manier om te beschrijven hoe ze werken, is door ze in actie te zien. Laten we een beleid voor opnieuw proberen toevoegen aan het eindpunt Ontvanger:

  1. Openen Program.cs in het project Afzender
  2. Voeg na de .EnableInstallers regel de volgende code toe:
endpointConfiguration.SendFailedMessagesTo("error");
var recoverability = endpointConfiguration.Recoverability();
recoverability.Immediate(
    immediate =>
    {
        immediate.NumberOfRetries(3);
    });
recoverability.Delayed(
    delayed =>
    {
        delayed.NumberOfRetries(2);
        delayed.TimeIncrease(TimeSpan.FromSeconds(5));
    });

Voordat we bespreken hoe dit beleid werkt, laten we het in actie zien. Voordat u het herstelbaarheidsbeleid test, moet u een fout simuleren. Open de PingHandler code in het project Ontvanger en verwijder de opmerking bij deze regel:

throw new Exception("BOOM");

Wanneer de ontvanger nu een Ping bericht afhandelt, mislukt dit. Start de oplossing opnieuw en laten we eens kijken wat er gebeurt in de ontvanger.

Met onze minder betrouwbare PingHandler, mislukken al onze berichten. U ziet dat het beleid voor opnieuw proberen van kracht wordt voor deze berichten. De eerste keer dat een bericht mislukt, wordt het maximaal drie keer opnieuw geprobeerd:

Afbeelding van het beleid voor onmiddellijke nieuwe pogingen waarmee berichten maximaal 3 keer opnieuw worden geprobeerd

Het blijft natuurlijk mislukken, dus wanneer de drie onmiddellijke nieuwe pogingen zijn verbruikt, wordt het vertraagde beleid voor opnieuw proberen gestart en wordt het bericht 5 seconden uitgesteld:

Afbeelding van het beleid voor vertraagde nieuwe pogingen dat de berichten in stappen van 5 seconden uitstelt voordat een nieuwe ronde van onmiddellijke nieuwe pogingen wordt uitgevoerd

Nadat deze 5 seconden zijn verstreken, wordt het bericht nog drie keer opnieuw geprobeerd (dat wil gezegd, een andere herhaling van het beleid voor onmiddellijke nieuwe pogingen). Deze zullen ook mislukken en NServiceBus zal het bericht opnieuw vertragen, deze keer gedurende 10 seconden, voordat u het opnieuw probeert.

Als PingHandler het nog steeds niet lukt na het uitvoeren van het volledige beleid voor opnieuw proberen, wordt het bericht in een gecentraliseerde foutwachtrij geplaatst, met de naam error, zoals gedefinieerd door de aanroep van SendFailedMessagesTo.

Afbeelding van het mislukte bericht

Het concept van een gecentraliseerde foutwachtrij verschilt van het mechanisme voor onbestelbare letters in Azure Service Bus, dat een wachtrij met onbestelbare berichten heeft voor elke verwerkingswachtrij. Met NServiceBus fungeren de wachtrijen met onbestelbare berichten in Azure Service Bus als echte gifberichtenwachtrijen, terwijl berichten die in de gecentraliseerde foutwachtrij terechtkomen, indien nodig op een later tijdstip opnieuw kunnen worden verwerkt.

Het beleid voor opnieuw proberen helpt bij het oplossen van verschillende typen fouten die vaak tijdelijk of semi-tijdelijk van aard zijn. Dat wil gezegd, fouten die tijdelijk zijn en vaak verdwijnen als het bericht na een korte vertraging gewoon opnieuw wordt verwerkt. Voorbeelden zijn netwerkfouten, databasevergrendelingen en API-storingen van derden.

Zodra een bericht zich in de foutenwachtrij bevindt, kunt u de berichtdetails bekijken in het hulpprogramma van uw keuze en vervolgens bepalen wat u ermee wilt doen. Met behulp van ServicePulse, een bewakingsprogramma van bepaalde software, kunnen we bijvoorbeeld de berichtdetails en de reden voor de fout bekijken:

Afbeelding van ServicePulse, van bepaalde software

Nadat u de details hebt bekeken, kunt u het bericht terugsturen naar de oorspronkelijke wachtrij voor verwerking. U kunt het bericht ook bewerken voordat u dit doet. Als er meerdere berichten in de foutwachtrij staan die om dezelfde reden zijn mislukt, kunnen ze allemaal als batch worden teruggestuurd naar de oorspronkelijke bestemmingen.

Nu is het tijd om erachter te komen waar we onze oplossing in Azure kunnen implementeren.

Waar moet ik de services hosten in Azure?

In dit voorbeeld zijn de eindpunten Afzender en Ontvanger geconfigureerd om te worden uitgevoerd als consoletoepassingen. Ze kunnen ook worden gehost in verschillende Azure-services, waaronder Azure Functions, Azure-app Services, Azure Container Instances, Azure Kubernetes Services en Azure-VM's. Hier ziet u bijvoorbeeld hoe het eindpunt van de afzender kan worden geconfigureerd om te worden uitgevoerd als een Azure-functie:

[assembly: FunctionsStartup(typeof(Startup))]
[assembly: NServiceBusEndpointName("Sender")]

public class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {
        builder.UseNServiceBus(() =>
        {
            var configuration = new ServiceBusTriggeredEndpointConfiguration("Sender");
            var transport = configuration.AdvancedConfiguration.Transport;
            transport.Routing().RouteToEndpoint(typeof(Ping), "Receiver");

            return configuration;
        });
    }
}

Zie Azure Functions met Azure Service Bus in de documentatie over NServiceBus voor meer informatie over het gebruik van NServiceBus met Functions.

Volgende stappen

Zie de volgende artikelen voor meer informatie over het gebruik van NServiceBus met Azure-services: