Azure Event Hubs Event Processor ügyfélkódtár a .NET-hez – 5.8.1-es verzió

Azure Event Hubs egy nagymértékben skálázható közzétételi-feliratkozási szolgáltatás, amely másodpercenként több millió eseményt képes beemelni, és több fogyasztónak streamelni. Ez lehetővé teszi a csatlakoztatott eszközök és alkalmazások által létrehozott nagy mennyiségű adat feldolgozását és elemzését. Miután az Event Hubs összegyűjtötte az adatokat, bármely valós idejű elemzési szolgáltatóval vagy kötegelési/tárolási adapterekkel lekérheti, átalakíthatja és tárolhatja azokat. Ha többet szeretne megtudni a Azure Event Hubs, tekintse át a következőt: Mi az az Event Hubs?

Az Eseményfeldolgozó ügyfélkódtár a Azure Event Hubs ügyfélkódtár kiegészítője, amely egy különálló ügyfelet biztosít az események robusztus, tartós és méretezhető felhasználásához, amely az éles forgatókönyvek többségéhez megfelelő. Az Azure Storage-blobok használatával létrehozott, véleményezett implementáció az eseményfeldolgozót a következő célokra ajánljuk:

  • Események olvasása és feldolgozása egy eseményközpont összes partícióján nagy léptékben, az átmeneti hibákkal és időszakos hálózati problémákkal szembeni rugalmassággal.

  • Az események együttműködésen alapuló feldolgozása, ahol több processzor dinamikusan osztja el és osztja meg a felelősséget egy fogyasztói csoport kontextusában, a terhelést zökkenőmentesen kezeli, mivel a processzorokat hozzáadják és eltávolítják a csoportból.

  • Ellenőrzőpontok és állapotok tartós feldolgozása az Azure Storage-blobok háttéradattárként való használatával.

Forráskód | Csomag (NuGet) | API-referenciadokumentáció | Termékdokumentáció | Hibaelhárítási útmutató

Első lépések

Előfeltételek

  • Azure-előfizetés: Az Azure-szolgáltatások, köztük a Azure Event Hubs használatához előfizetésre lesz szüksége. Ha nem rendelkezik meglévő Azure-fiókkal, regisztrálhat egy ingyenes próbaverzióra , vagy használhatja a Visual Studio-előfizetés előnyeit a fiók létrehozásakor.

  • Event Hubs-névtér eseményközponttal: A Azure Event Hubs használatához egy névtérnek és egy eseményközpontnak is rendelkezésre kell állnia. Ha nem ismeri az Azure-erőforrások létrehozását, érdemes lehet követnie az eseményközpontok Azure Portal használatával történő létrehozásának részletes útmutatóját. Itt részletes útmutatást talál az Azure CLI, a Azure PowerShell vagy az Azure Resource Manager (ARM) sablonok event hub létrehozásához való használatához.

  • Azure Storage-fiók blobtárolóval: Az ellenőrzőpontok megőrzéséhez és az Azure Storage tulajdonjogának szabályozásához rendelkeznie kell egy elérhető blobokkal rendelkező Azure Storage-fiókkal. Ha nem ismeri az Azure Storage-fiókokat, érdemes lehet követnie a tárfiókok Azure Portal használatával történő létrehozásához szükséges részletes útmutatót. Itt részletes útmutatást talál az Azure CLI, a Azure PowerShell vagy az Azure Resource Manager (ARM) sablonok tárfiókok létrehozásához való használatához.

  • Azure Storage-blobtároló: Az Azure Storage ellenőrzőpont- és tulajdonjogi adatai egy adott tárolóban lévő blobokba lesznek írva. A EventProcessorClient meglévő tárolót igényel, és nem hoz létre implicit módon egyet a véletlen helytelen konfiguráció elleni védelem érdekében. Ha nem ismeri az Azure Storage-tárolókat, tekintse meg a tárolók kezelésével kapcsolatos dokumentációt. Itt részletes útmutatást talál a .NET, az Azure CLI vagy a Azure PowerShell használatával egy tároló létrehozásához.

  • C# 8.0: A Azure Event Hubs ügyfélkódtár a C# 8.0-ban bevezetett új funkciókat használja. A C# 8.0 szintaxis kihasználása érdekében javasoljuk, hogy a .NET Core SDK 3.0-s vagy újabb verzióját használja a nyelvi verziójávallatest.

    A C# 8.0 szintaxist teljes mértékben kihasználni kívánó Visual Studio-felhasználóknak a Visual Studio 2019-et vagy újabb verziót kell használniuk. A Visual Studio 2019, beleértve az ingyenes Community kiadást is, itt tölthető le. A Visual Studio 2017 felhasználói a Microsoft.Net.Compilers NuGet-csomag használatával és a nyelvi verzió beállításával kihasználhatják a C# 8 szintaxis előnyeit, bár a szerkesztési élmény nem feltétlenül ideális.

    A kódtárat továbbra is használhatja a korábbi C#-nyelvi verziókkal, de az új szintaxis használata helyett manuálisan kell kezelnie az aszinkron enumerálható és aszinkron eldobható tagokat. Továbbra is használhatja a .NET Core SDK által támogatott keretrendszerverziókat, beleértve a .NET Core vagy a .NET-keretrendszer korábbi verzióit is. További információ: cél-keretrendszerek megadása.

    Fontos megjegyzés: A példák és a minták módosítás nélküli létrehozásához vagy futtatásához a C# 11.0 használata szükséges. A mintákat továbbra is futtathatja, ha úgy dönt, hogy más nyelvi verziókhoz módosítja őket.

Ha gyorsan létre szeretné hozni a szükséges erőforrásokat az Azure-ban, és meg szeretné kapni a hozzájuk tartozó kapcsolati sztringeket, a mintasablont a következő gombra kattintva helyezheti üzembe:

Üzembe helyezés az Azure-ban

A csomag telepítése

Telepítse a .NET-hez készült Azure Event Hubs Event Processor ügyfélkódtárat a NuGet használatával:

dotnet add package Azure.Messaging.EventHubs.Processor

Az ügyfél hitelesítése

Event Hubs-kapcsolati sztring beszerzése

Ahhoz, hogy az Event Hubs ügyféloldali kódtára kommunikáljon egy eseményközponttal, meg kell értenie, hogyan csatlakozhat hozzá, és hogyan engedélyezheti azt. Ennek legegyszerűbb módja egy kapcsolati sztring használata, amely automatikusan jön létre az Event Hubs-névtér létrehozásakor. Ha nem ismeri az Event Hubs kapcsolati sztringjeinek használatát, érdemes lehet követnie a részletes útmutatót az Event Hubs kapcsolati sztring beszerzéséhez.

Azure Storage-kapcsolati sztring beszerzése

Ahhoz, hogy az eseményfeldolgozó ügyfél az Azure Storage-blobokat használja az ellenőrzőpontokhoz, meg kell értenie, hogyan csatlakozhat egy tárfiókhoz, és hogyan engedélyezheti azt. Ennek legegyszerűbb módja egy kapcsolati sztring használata, amely a tárfiók létrehozásakor jön létre. Ha nem ismeri a tárfiók kapcsolati sztring engedélyezését az Azure-ban, érdemes lehet követnie a részletes útmutatót az Azure Storage kapcsolati sztringjeinek konfigurálásához.

Fő fogalmak

  • Az eseményfeldolgozó egy adott eseményközponthoz való csatlakozással és az egyes partíciók eseményeinek feldolgozásával kapcsolatos felelősségek kezelésére szolgáló szerkezet, egy adott fogyasztói csoport kontextusában. A partícióról beolvasott események feldolgozásának és az esetleges hibák kezelésének műveletét az eseményfeldolgozó delegálja az Ön által megadott kódba, így a logika az üzleti érték biztosítására koncentrálhat, miközben a feldolgozó kezeli az olvasási eseményekhez kapcsolódó feladatokat, kezeli a partíciókat, és lehetővé teszi az állapotok ellenőrzőpontok formájában történő megőrzését.

  • Az ellenőrzőpont-beállítás olyan folyamat, amellyel az olvasók megjelölik és megőrzik pozíciójukat a partícióhoz feldolgozott eseményekhez. Az ellenőrzőpont-készítés a fogyasztó felelőssége, és partíciónként történik, jellemzően egy adott fogyasztói csoport kontextusában. EventProcessorClientA esetében ez azt jelenti, hogy egy fogyasztói csoport és partíciókombináció esetében a processzornak nyomon kell követnie az aktuális pozícióját az eseménystreamben. Ha további információra van szüksége, tekintse meg az ellenőrzőpont-készítést az Event Hubs termékdokumentációjában.

    Amikor egy eseményfeldolgozó csatlakozik, elkezdi olvasni azokat az eseményeket az ellenőrzőponton, amelyet korábban a partíció utolsó feldolgozója őrizett meg az adott fogyasztói csoportban, ha van ilyen. Amikor az eseményfeldolgozó beolvassa és végrehajtja a partíció eseményeit, rendszeres időközönként létre kell hoznia ellenőrzőpontokat, hogy az eseményeket az alsóbb rétegbeli alkalmazások "befejezettként" jelöljék meg, és rugalmasságot biztosítsanak, ha egy eseményfeldolgozó vagy az azt üzemeltető környezet meghibásodik. Szükség esetén újra feldolgozhatók a korábban "készként" megjelölt események, ha egy korábbi eltolást ad meg ezen az ellenőrzőpont-ellenőrzési folyamaton keresztül.

  • A partíció egy eseményközpontban tárolt események rendezett sorozata. A partíciók az eseményfelhasználók által megkövetelt párhuzamossághoz kapcsolódó adatszervezési eszközök. Azure Event Hubs egy particionált fogyasztói mintán keresztül biztosítja az üzenetek streamelésének folyamatát, amelyben minden fogyasztó csak az üzenetstream egy adott részhalmazát vagy partícióját olvassa be. Ha új esemény érkezik, az a sorozat végére kerül. A partíciók száma az eseményközpont létrehozásakor van megadva, és nem módosítható.

  • A fogyasztói csoport egy teljes eseményközpont nézete. A fogyasztói csoportok lehetővé teszik, hogy több fogyasztó alkalmazás külön nézetet biztosítson az eseménystreamről, és a streamet a saját tempójukban és a saját helyzetüktől függetlenül olvassák. Fogyasztói csoportonként legfeljebb 5 egyidejű olvasó lehet egy partíción; azonban ajánlott egy adott partíció- és fogyasztóicsoport-párosításhoz csak egy aktív fogyasztót létrehozni. Minden aktív olvasó megkapja az összes eseményt a partíciójáról; ha több olvasó is található ugyanazon a partíción, akkor ismétlődő eseményeket kapnak.

További fogalmakért és részletesebb ismertetésért lásd: Event Hubs-szolgáltatások.

Ügyfélélettartam

A EventProcessorClient biztonságosan gyorsítótárazható és használható egyszeriként az alkalmazás teljes élettartama során, ami az ajánlott eljárás az események rendszeres olvasása esetén. Az ügyfelek felelősek a hálózat, a CPU és a memóriahasználat hatékony felügyeletéért, és azon dolgoznak, hogy a használat alacsony maradjon inaktivitási időszakokban. A hálózati erőforrások és más nem felügyelt objektumok megfelelő törlésének biztosításához hívásra StopProcessingAsync vagy StopProcessing a processzorra van szükség.

Menetbiztonság

Garantáljuk, hogy minden ügyfélpéldány-metódus szálbiztos és független egymástól (útmutató). Ez biztosítja, hogy az ügyfélpéldányok újrafelhasználására vonatkozó javaslat mindig biztonságos legyen, még a szálak között is.

Az adatmodellek típusai, például EventData a és EventDataBatch a nem szálbiztosak. Nem oszthatók meg a szálak között, és nem használhatók egyidejűleg az ügyfélmetókkal.

További fogalmak

Ügyfélbeállítások | Eseménykezelők | Hibák | kezelése Diagnosztika | Utánzás (processzor) | Utánzás (ügyféltípusok)

Példák

Eseményfeldolgozó ügyfél létrehozása

Mivel a EventProcessorClient az állapotának megőrzése érdekében függ az Azure Storage-bloboktól, meg kell adnia egy BlobContainerClient processzort, amely konfigurálva lett a használni kívánt tárfiókhoz és tárolóhoz. A konfiguráláshoz EventProcessorClient használt tárolónak léteznie kell.

Mivel a EventProcessorClient nem létezik tároló megadásának szándékát nem ismeri, nem fogja implicit módon létrehozni a tárolót. Ez védelmet nyújt a hibásan konfigurált tárolók ellen, ami azt eredményezi, hogy egy hibás processzor nem tudja megosztani a tulajdonjogot, és nem tud zavarni a fogyasztói csoport többi processzorával.

// The container specified when creating the BlobContainerClient must exist; it will
// not be implicitly created.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

Az esemény- és hibakezelők konfigurálása

A használatához az EventProcessorClienteseményfeldolgozáshoz és a hibákhoz kezelőket kell megadni. Ezek a kezelők önállónak minősülnek, és a fejlesztők felelősek annak biztosításáért, hogy a kezelőkódon belüli kivételek el legyenek számolva.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

async Task processEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an event.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an error.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheError(eventArgs.Exception);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

Feldolgozás indítása és leállítása

A EventProcessorClient a háttérben végzi el a feldolgozást, miután az explicit módon elindult, és addig folytatja, amíg explicit módon le nem áll. Bár ez lehetővé teszi, hogy az alkalmazáskód más feladatokat is végrehajtson, az is a feladata, hogy biztosítsa, hogy a folyamat ne fejeződjön be a feldolgozás során, ha más feladatokat nem hajtanak végre.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}

Active Directory-rendszerbiztonsági tag használata az eseményfeldolgozó ügyféllel

Az Azure Identity-kódtár Azure Active Directory-hitelesítési támogatást nyújt, amely az Azure-ügyfélkódtárakhoz használható, beleértve az Event Hubsot és az Azure Storage-ot is.

Az Active Directory-rendszerbiztonsági tag használatához meg kell adnia a Azure.Identity tárban elérhető hitelesítő adatok egyikét az Event Hubs-ügyfél létrehozásakor. Emellett a teljes Event Hubs-névtér és a kívánt eseményközpont neve is meg van adva az Event Hubs kapcsolati sztring helyett.

Ahhoz, hogy az Active Directory-rendszernevet azure storage-blobtárolókkal lehessen használni, meg kell adni a tároló teljes URL-címét a tárolóügyfél létrehozásakor. A Blob Storage eléréséhez használható érvényes URI-formátumokkal kapcsolatos részletekért lásd: Tárolók, blobok és metaadatok elnevezése és hivatkozása.

var credential = new DefaultAzureCredential();
var blobStorageUrl ="<< FULLY-QUALIFIED CONTAINER URL (like https://myaccount.blob.core.windows.net/mycontainer) >>";

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(new Uri(blobStorageUrl), credential);

var processor = new EventProcessorClient
(
    storageClient,
    consumerGroup,
    fullyQualifiedNamespace,
    eventHubName,
    credential
);

Ha az Azure Active Directoryt az Event Hubs szolgáltatással használja, a rendszerbiztonsági taghoz hozzá kell rendelni egy olyan szerepkört, amely lehetővé teszi az Event Hubsból való olvasást, például a Azure Event Hubs Data Receiver szerepkört. Az Azure Active Directory-engedélyezés event hubokkal való használatával kapcsolatos további információkért tekintse meg a kapcsolódó dokumentációt.

Ha az Azure Active Directoryt az Azure Storage-ral használja, az egyszerű felhasználónak hozzá kell rendelnie egy szerepkört, amely lehetővé teszi a blobokhoz, például a Storage Blob Data Contributor szerepkörhöz való olvasást, írást és törlést. Az Active Directory-engedélyezés Azure Storage-ral való használatáról további információt a kapcsolódó dokumentációban és az Azure Storage engedélyezési mintájában talál.

Hibaelhárítás

Részletes hibaelhárítási információkért tekintse meg az Event Hubs hibaelhárítási útmutatót.

Kivételkezelés

Eseményfeldolgozó ügyfél kivételei

Az Eseményfeldolgozó ügyfél minden olyan kísérletet megtesz, hogy rugalmas legyen a kivételekkel szemben, és megteszi a szükséges lépéseket a feldolgozás folytatásához, kivéve, ha ez lehetetlen. Ehhez nincs szükség a fejlesztők részéről tett lépésekre ; natív módon része a processzor viselkedésének.

Annak érdekében, hogy a fejlesztők megvizsgálhassák és reagálhassanak az eseményfeldolgozó ügyfélműveleteiben előforduló kivételekre, az ProcessError eseményen keresztül kerülnek felszínre. Az esemény argumentumai részletesen ismertetik a kivételt és a megfigyelt környezetet. A fejlesztők normál műveleteket hajthatnak végre az eseményfeldolgozó ügyfélen az eseménykezelőn belül, például leállíthatják és/vagy újraindíthatják azt a hibákra válaszul, de más módon nem befolyásolhatják a processzor kivételének viselkedését.

A hibakezelő implementálásának alapszintű példájáért tekintse meg a következő mintát: Eseményfeldolgozó-kezelők.

Kivételek az eseménykezelőkben

Mivel az eseményfeldolgozó ügyfél nem rendelkezik a megfelelő kontextussal a fejlesztők által megadott eseménykezelők kivételeinek súlyosságának megértéséhez, nem feltételezheti, hogy milyen műveletek lennének ésszerű válaszaik. Ennek eredményeképpen a fejlesztők felelősek a blokkok és más szabványos nyelvi szerkezetek használatával try/catch biztosított eseménykezelőkben előforduló kivételekért.

Az Eseményfeldolgozó ügyfél nem kísérli meg észlelni a fejlesztői kód kivételeinek észlelését, és nem teszi őket explicit módon felszínre. Az eredményül kapott viselkedés a processzor üzemeltetési környezetétől és az eseménykezelő meghívásának környezetétől függ. Mivel ez különböző forgatókönyvekben eltérő lehet, erősen ajánlott, hogy a fejlesztők védelmi módon kódolják az eseménykezelőket, és vegyék figyelembe a lehetséges kivételeket.

Naplózás és diagnosztika

Az Eseményfeldolgozó ügyfélkódtár teljes mértékben az információk naplózására szolgál különböző részletességi szinteken a .NET EventSource használatával az információk kibocsátásához. A naplózás minden művelethez történik, és követi a művelet kezdőpontjának, befejezésének és a tapasztalt kivételeknek a megjelölésére szolgáló mintát. A rendszer további információkat is naplóz, amelyek betekintést nyújthatnak a társított művelet kontextusában.

Az eseményfeldolgozó ügyfélnaplói bárki EventListener számára elérhetők az "Azure-Messaging-EventHubs-Processor-EventProcessorClient" nevű forrás kiválasztásával, vagy az "AzureEventSource" tulajdonságú összes forrás kiválasztásával. A naplók Azure-ügyfélkódtárakból való rögzítésének megkönnyítése érdekében az Azure.Core Event Hubs által használt kódtár egy AzureEventSourceListener. További információ: Event Hubs-naplók rögzítése az AzureEventSourceListener használatával.

Az Eseményfeldolgozó kódtár az Application Insights vagy az OpenTelemetry használatával történő elosztott nyomkövetéshez is használható. További információ az Azure.Core Diagnostics-mintában található.

Következő lépések

A tárgyalt forgatókönyveken túl a Azure Event Hubs processzortár további forgatókönyveket is támogat, amelyek segítenek kihasználni a teljes funkciókészletet.EventProcessorClient Ezen forgatókönyvek megismeréséhez az Event Hubs Processzor ügyféloldali kódtára egy mintaprojektet kínál, amely a gyakori forgatókönyvek szemléltetésére szolgál. A részletekért tekintse meg a README mintákat.

Közreműködés

A projektben szívesen fogadjuk a hozzájárulásokat és a javaslatokat. A legtöbb hozzájáruláshoz el kell fogadnia egy Közreműködői licencszerződést (CLA-t), amelyben kijelenti, hogy jogosult arra, hogy ránk ruházza hozzájárulása felhasználási jogát, és ezt ténylegesen meg is teszi. További részletekért lásd: https://cla.microsoft.com.

A lekéréses kérelmek elküldésekor egy CLA-robot automatikusan meghatározza, hogy kell-e biztosítania CLA-t, és megfelelően kitölti a lekéréses kérelmet (például címke, megjegyzés). Egyszerűen csak kövesse a robot által megadott utasításokat. Ezt csak egyszer kell elvégeznie az összes olyan tárházban, amely a CLA-t használja.

A projekt a Microsoft nyílt forráskódú projekteket szabályozó etikai kódexe, a Microsoft Open Source Code of Conduct hatálya alá esik. További információkért lásd a viselkedési szabályzattal kapcsolatos gyakori kérdéseket , vagy vegye fel a kapcsolatot opencode@microsoft.com az esetleges további kérdésekkel vagy megjegyzésekkel.

További információért tekintse meg a közreműködői útmutatónkat .

Megjelenések