Share via


Azure Event Hubs händelseprocessorklientbibliotek för .NET – version 5.8.1

Azure Event Hubs är en mycket skalbar publiceringsprenumereringstjänst som kan mata in miljontals händelser per sekund och strömma dem till flera konsumenter. På så sätt kan du bearbeta och analysera de enorma mängder data som produceras av dina anslutna enheter och program. När Event Hubs har samlat in data kan du hämta, transformera och lagra dem med hjälp av valfri realtidsanalysprovider eller med batchbearbetning/lagringskort. Om du vill veta mer om Azure Event Hubs kanske du vill granska: Vad är Event Hubs?

Klientbiblioteket för händelseprocessorn är ett komplement till Azure Event Hubs-klientbiblioteket, vilket ger en fristående klient för användning av händelser på ett robust, hållbart och skalbart sätt som är lämpligt för de flesta produktionsscenarier. En åsiktsimplementering som skapats med Azure Storage-blobar. Händelseprocessorn rekommenderas för:

  • Läsa och bearbeta händelser över alla partitioner i en händelsehubb i stor skala med motståndskraft mot tillfälliga fel och tillfälliga nätverksproblem.

  • Bearbeta händelser i samarbete, där flera processorer dynamiskt distribuerar och delar ansvaret i kontexten för en konsumentgrupp, och hanterar belastningen på ett smidigt sätt när processorer läggs till och tas bort från gruppen.

  • Hantera kontrollpunkter och tillstånd för bearbetning på ett hållbart sätt med hjälp av Azure Storage-blobar som underliggande datalager.

| Källkod Paket (NuGet) | API-referensdokumentation | Produktdokumentation | Felsökningsguide

Komma igång

Förutsättningar

  • Azure-prenumeration: Om du vill använda Azure-tjänster, inklusive Azure Event Hubs, behöver du en prenumeration. Om du inte har ett befintligt Azure-konto kan du registrera dig för en kostnadsfri utvärderingsversion eller använda dina Visual Studio-prenumerationsförmåner när du skapar ett konto.

  • Event Hubs-namnrymd med en händelsehubb: Om du vill interagera med Azure Event Hubs måste du också ha ett namnområde och en händelsehubb tillgänglig. Om du inte är bekant med att skapa Azure-resurser kanske du vill följa den stegvisa guiden för att skapa en händelsehubb med hjälp av Azure Portal. Där hittar du även detaljerade instruktioner för hur du använder Azure CLI-, Azure PowerShell- eller Azure Resource Manager-mallar (ARM) för att skapa en händelsehubb.

  • Azure Storage-konto med bloblagring: För att bevara kontrollpunkter och styra ägarskapet i Azure Storage måste du ha ett Azure Storage-konto med blobar tillgängliga. Om du inte är bekant med Azure Storage-konton kanske du vill följa den stegvisa guiden för att skapa ett lagringskonto med hjälp av Azure Portal. Där hittar du även detaljerade instruktioner för hur du använder Arm-mallar (Azure CLI, Azure PowerShell eller Azure Resource Manager) för att skapa lagringskonton.

  • Azure Storage-blobcontainer: Kontrollpunkts- och ägarskapsdata i Azure Storage skrivs till blobar i en specifik container. EventProcessorClient kräver en befintlig container och skapar inte implicit en för att skydda mot oavsiktlig felkonfiguration. Om du inte är bekant med Azure Storage-containrar kan du läsa dokumentationen om hur du hanterar containrar. Där hittar du detaljerade instruktioner för hur du använder .NET, Azure CLI eller Azure PowerShell för att skapa en container.

  • C# 8.0: Azure Event Hubs-klientbiblioteket använder nya funktioner som introducerades i C# 8.0. För att kunna dra nytta av C# 8.0-syntaxen rekommenderar vi att du kompilerar med hjälp av .NET Core SDK 3.0 eller senare med en språkversion av latest.

    Visual Studio-användare som vill dra full nytta av C# 8.0-syntaxen måste använda Visual Studio 2019 eller senare. Visual Studio 2019, inklusive den kostnadsfria Community Edition, kan laddas ned här. Användare av Visual Studio 2017 kan dra nytta av C# 8-syntaxen genom att använda NuGet-paketet Microsoft.Net.Compilers och ange språkversionen, även om redigeringsupplevelsen kanske inte är idealisk.

    Du kan fortfarande använda biblioteket med tidigare C#-språkversioner, men du måste hantera asynkrona uppräkningsbara och asynkrona engångsmedlemmar manuellt i stället för att dra nytta av den nya syntaxen. Du kan fortfarande använda alla ramverksversioner som stöds av .NET Core SDK, inklusive tidigare versioner av .NET Core eller .NET Framework. Mer information finns i: så här anger du målramverk.

    Viktigt meddelande: För att kunna skapa eller köra exemplen och exemplen utan ändringar är det nödvändigt att använda C# 11.0. Du kan fortfarande köra exemplen om du bestämmer dig för att justera dem för andra språkversioner.

Om du snabbt vill skapa de resurser som behövs i Azure och ta emot anslutningssträngar åt dem kan du distribuera vår exempelmall genom att klicka på:

Distribuera till Azure

Installera paketet

Installera klientbiblioteket för Azure Event Hubs händelseprocessor för .NET med hjälp av NuGet:

dotnet add package Azure.Messaging.EventHubs.Processor

Autentisera klienten

Hämta en Event Hubs-anslutningssträng

För att Event Hubs-klientbiblioteket ska kunna interagera med en händelsehubb måste det förstå hur man ansluter och auktoriserar med den. Det enklaste sättet att göra detta är att använda en anslutningssträng som skapas automatiskt när du skapar ett Event Hubs-namnområde. Om du inte är bekant med att använda anslutningssträngar med Event Hubs kanske du vill följa den stegvisa guiden för att hämta en Event Hubs-anslutningssträng.

Hämta en Azure Storage-anslutningssträng

För att händelseprocessorklienten ska kunna använda Azure Storage-blobar för kontrollpunkter måste den förstå hur man ansluter till ett lagringskonto och auktoriserar med det. Den enklaste metoden är att använda en anslutningssträng som genereras när lagringskontot skapas. Om du inte är bekant med auktorisering av anslutningssträngar för lagringskonton i Azure kanske du vill följa den stegvisa guiden för att konfigurera Azure Storage-anslutningssträngar.

Viktiga begrepp

  • En händelseprocessor är en konstruktion som är avsedd att hantera de ansvarsområden som är associerade med att ansluta till en viss händelsehubb och bearbeta händelser från var och en av dess partitioner, i kontexten för en specifik konsumentgrupp. Åtgärden att bearbeta händelser som lästs från partitionen och hantering av eventuella fel som inträffar delegeras av händelseprocessorn till kod som du anger, vilket gör att din logik kan koncentrera sig på att leverera affärsvärde medan processorn hanterar de uppgifter som är associerade med att läsa händelser, hantera partitionerna och tillåta att tillstånd bevaras i form av kontrollpunkter.

  • Kontrollpunkter är en process genom vilken läsarna markerar och bevarar sin position för händelser som har bearbetats för en partition. Kontrollpunkter är konsumentens ansvar och sker på en partition, vanligtvis i kontexten för en specifik konsumentgrupp. För innebär detta att processorn EventProcessorClientför en kombination av konsumentgrupper och partitioner måste hålla reda på sin aktuella position i händelseströmmen. Om du vill ha mer information kan du läsa om kontrollpunkter i produktdokumentationen för Event Hubs.

    När en händelseprocessor ansluter börjar den läsa händelser vid kontrollpunkten som tidigare sparades av den sista processorn för partitionen i den konsumentgruppen, om det finns en sådan. När en händelseprocessor läser och agerar på händelser i partitionen bör den regelbundet skapa kontrollpunkter för att både markera händelserna som "slutförda" av underordnade program och för att ge återhämtning om en händelseprocessor eller miljön som är värd för den misslyckas. Om det skulle vara nödvändigt är det möjligt att ombearbeta händelser som tidigare har markerats som "slutförda" genom att ange en tidigare förskjutning genom den här kontrollpunktsprocessen.

  • En partition är en ordnad sekvens av händelser som lagras i en händelsehubb. Partitioner är ett sätt att organisera data som är associerade med den parallellitet som krävs av händelsekonsumenter. Azure Event Hubs tillhandahåller meddelandeströmning via ett partitionerat konsumentmönster där varje konsument endast läser en viss delmängd, eller partition, av meddelandeströmmen. När nya händelser anländer läggs de till i slutet av denna sekvens. Antalet partitioner anges när en händelsehubb skapas och kan inte ändras.

  • En konsumentgrupp är en vy över en hel händelsehubb. Konsumentgrupper gör det möjligt för flera konsumerande program att ha en separat vy över händelseströmmen och att läsa dataströmmen oberoende av varandra i sin egen takt och från sin egen position. Det kan finnas högst 5 samtidiga läsare på en partition per konsumentgrupp. Vi rekommenderar dock att det bara finns en aktiv konsument för en viss partition och parkoppling av konsumentgrupper. Varje aktiv läsare tar emot alla händelser från partitionen. Om det finns flera läsare på samma partition får de duplicerade händelser.

Fler begrepp och djupare diskussion finns i: Event Hubs-funktioner.

Klientlivslängd

EventProcessorClient är säkert att cachelagrat och använda som en singleton under programmets livslängd, vilket är bästa praxis när händelser läss regelbundet. Klienterna ansvarar för effektiv hantering av nätverk, CPU och minnesanvändning, och arbetar för att hålla användningen låg under perioder av inaktivitet. Det krävs anrop StopProcessingAsync eller StopProcessing på processorn för att säkerställa att nätverksresurser och andra ohanterade objekt rensas korrekt.

Trådsäkerhet

Vi garanterar att alla klientinstansmetoder är trådsäkra och oberoende av varandra (riktlinje). Detta säkerställer att rekommendationen att återanvända klientinstanser alltid är säker, även över trådar.

Datamodelltyperna, till exempel EventData och EventDataBatch är inte trådsäkra. De bör inte delas mellan trådar eller användas samtidigt med klientmetoder.

Ytterligare begrepp

Klientalternativ | Händelsehanterare | Hantera fel | Diagnostik | Simulera (processor) | Simulera (klienttyper)

Exempel

Skapa en händelseprocessorklient

EventProcessorClient Eftersom har ett beroende av Azure Storage-blobar för beständighet för dess tillstånd, måste du ange en BlobContainerClient för processorn, som har konfigurerats för lagringskontot och containern som ska användas. Containern som används för att konfigurera EventProcessorClient måste finnas.

EventProcessorClient Eftersom har inget sätt att veta avsikten att ange en container som inte finns, kommer den inte implicit att skapa containern. Detta fungerar som ett skydd mot en felkonfigurerad container som gör att en obehörig processor inte kan dela ägarskap och störa andra processorer i konsumentgruppen.

// The container specified when creating the BlobContainerClient must exist; it will
// not be implicitly created.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

Konfigurera händelse- och felhanterare

För att kunna använda EventProcessorClientmåste hanterare för händelsebearbetning och fel anges. Dessa hanterare anses vara självständiga och utvecklare ansvarar för att säkerställa att undantag i hanterarkoden redovisas.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

async Task processEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an event.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an error.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheError(eventArgs.Exception);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

Starta och stoppa bearbetning

EventProcessorClient utför bearbetningen i bakgrunden när den uttryckligen har startats och fortsätter att göra det tills den uttryckligen har stoppats. Även om det gör att programkoden kan utföra andra uppgifter, lägger den också ansvaret för att säkerställa att processen inte avslutas under bearbetningen om det inte finns några andra uppgifter som utförs.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}

Använda ett Active Directory-huvudnamn med händelsebearbetningsklienten

Azure Identity-biblioteket har stöd för Azure Active Directory-autentisering som kan användas för Azure-klientbiblioteken, inklusive Event Hubs och Azure Storage.

Om du vill använda ett Active Directory-huvudnamn anges en av de tillgängliga autentiseringsuppgifterna Azure.Identity från biblioteket när du skapar Event Hubs-klienten. Dessutom tillhandahålls det fullständigt kvalificerade Event Hubs-namnområdet och namnet på önskad händelsehubb i stället för Event Hubs-anslutningssträngen.

Om du vill använda ett Active Directory-huvudnamn med Azure Storage-blobcontainrar måste den fullständigt kvalificerade URL:en till containern anges när du skapar lagringsklienten. Information om giltiga URI-format för åtkomst till Blob Storage finns i Namngivning och referens av containrar, blobar och metadata.

var credential = new DefaultAzureCredential();
var blobStorageUrl ="<< FULLY-QUALIFIED CONTAINER URL (like https://myaccount.blob.core.windows.net/mycontainer) >>";

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(new Uri(blobStorageUrl), credential);

var processor = new EventProcessorClient
(
    storageClient,
    consumerGroup,
    fullyQualifiedNamespace,
    eventHubName,
    credential
);

När du använder Azure Active Directory med Event Hubs måste huvudkontot tilldelas en roll som tillåter läsning från Event Hubs, till exempel Azure Event Hubs Data Receiver rollen. Mer information om hur du använder Azure Active Directory-auktorisering med Event Hubs finns i den associerade dokumentationen.

När du använder Azure Active Directory med Azure Storage måste ditt huvudnamn tilldelas en roll som tillåter läs-, skriv- och borttagningsåtkomst till blobar, till exempel Storage Blob Data Contributor rollen. Mer information om hur du använder Active Directory-auktorisering med Azure Storage finns i den associerade dokumentationen och Azure Storage-auktoriseringsexemplet.

Felsökning

Detaljerad felsökningsinformation finns i felsökningsguiden för Event Hubs.

Undantagshantering

Klientfel för händelseprocessor

Händelseprocessorklienten gör alla försök att vara motståndskraftiga inför undantag och vidtar nödvändiga åtgärder för att fortsätta bearbetningen om det inte är omöjligt att göra det. Inga åtgärder från utvecklare krävs för att detta ska ske. det är inbyggt en del av processorns beteende.

För att göra det möjligt för utvecklare att inspektera och reagera på undantag som inträffar inom klientåtgärderna för händelseprocessorn visas de via ProcessError händelsen. Argumenten för den här händelsen innehåller information om undantaget och kontexten där det observerades. Utvecklare kan utföra normala åtgärder på händelseprocessorklienten inifrån den här händelsehanteraren, till exempel stoppa och/eller starta om den som svar på fel, men kanske inte på annat sätt påverkar processorns undantagsbeteende.

Ett grundläggande exempel på hur du implementerar felhanteraren finns i exemplet: Händelseprocessorhanterare.

Undantag i händelsehanterare

Eftersom händelseprocessorklienten saknar rätt kontext för att förstå allvarlighetsgraden för undantag inom de händelsehanterare som utvecklare tillhandahåller, kan den inte anta vilka åtgärder som skulle vara ett rimligt svar på dem. Därför anses utvecklare vara ansvariga för undantag som inträffar inom de händelsehanterare som de tillhandahåller med hjälp av try/catch block och andra standardspråkkonstruktioner.

Händelseprocessorklienten försöker inte identifiera undantag i utvecklarkod eller visa dem explicit. Det resulterande beteendet beror på processorns värdmiljö och kontexten där händelsehanteraren anropades. Eftersom detta kan variera mellan olika scenarier rekommenderar vi starkt att utvecklare kodar sina händelsehanterare defensivt och tar hänsyn till potentiella undantag.

Loggning och diagnostik

Klientbiblioteket för händelseprocessorn är fullständigt instrumenterat för loggningsinformation på olika detaljnivåer med hjälp av .NET EventSource för att generera information. Loggning utförs för varje åtgärd och följer mönstret för att markera startpunkten för åtgärden, slutförandet och eventuella undantag som påträffas. Ytterligare information som kan ge insikter loggas också i kontexten för den associerade åtgärden.

Klientloggarna för händelseprocessorn är tillgängliga för alla EventListener genom att välja källan "Azure-Messaging-EventHubs-Processor-EventProcessorClient" eller välja alla källor som har egenskapen "AzureEventSource". För att göra det enklare att samla in loggar från Azure-klientbiblioteken Azure.Core erbjuder biblioteket som används av Event Hubs en AzureEventSourceListener. Mer information finns i Samla in Event Hubs-loggar med hjälp av AzureEventSourceListener.

Event Processor-biblioteket är också instrumenterat för distribuerad spårning med Application Insights eller OpenTelemetry. Mer information finns i Azure.Core Diagnostics-exemplet.

Nästa steg

Utöver de scenarier som beskrivs erbjuder Azure Event Hubs Processor-biblioteket stöd för ytterligare scenarier för att dra nytta av den fullständiga funktionsuppsättningen i EventProcessorClient. För att hjälpa dig att utforska några av dessa scenarier erbjuder Event Hubs Processor-klientbiblioteket ett projekt med exempel som fungerar som en illustration för vanliga scenarier. Mer information finns i README-exemplen .

Bidra

Det här projektet välkomnar bidrag och förslag. Merparten av bidragen kräver att du godkänner ett licensavtal för bidrag, där du deklarerar att du har behörighet att bevilja oss rättigheten att använda ditt bidrag, och att du dessutom uttryckligen gör så. Mer information finns på https://cla.microsoft.com.

När du skickar en pull-förfrågan avgör en CLA-robot automatiskt om du måste tillhandahålla ett licensavtal för bidrag med lämplig PR (t.ex. etikett eller kommentar). Följ bara robotens anvisningar. Du behöver bara göra detta en gång för alla repor som använder vårt licensavtal för bidrag.

Det här projektet använder sig av Microsofts uppförandekod för öppen källkod. Mer information finns i Vanliga frågor och svar om uppförandekod eller kontakt opencode@microsoft.com med ytterligare frågor eller kommentarer.

Mer information finns i vår bidragsguide .

Visningar