Események küldése és fogadása az Azure Event Hubson a Java használatával
Ez a rövid útmutató bemutatja, hogyan küldhet eseményeket egy eseményközpontba, és hogyan fogadhat eseményeket az azure-messaging-eventhubs Java-csomag használatával.
Tipp
Ha egy Spring-alkalmazásban Azure Event Hubs erőforrásokkal dolgozik, javasoljuk, hogy alternatívaként tekintse a Spring Cloud Azure-t. A Spring Cloud Azure egy nyílt forráskódú projekt, amely zökkenőmentes Spring-integrációt biztosít az Azure-szolgáltatásokkal. Ha többet szeretne megtudni a Spring Cloud Azure-ról, és szeretne egy példát látni az Event Hubs használatával, olvassa el a Spring Cloud Stream with Azure Event Hubs című témakört.
Előfeltételek
Ha még nem ismerkedik a Azure Event Hubs, a rövid útmutató előtt tekintse meg az Event Hubs áttekintését.
A rövid útmutató elvégzéséhez a következő előfeltételekre van szüksége:
- Microsoft Azure-előfizetés. Az Azure-szolgáltatások , köztük a Azure Event Hubs használatához előfizetésre van szüksége. Ha nem rendelkezik meglévő Azure-fiókkal, regisztrálhat egy ingyenes próbaverzióra , vagy használhatja az MSDN-előfizetői előnyöket a fiók létrehozásakor.
- Java-fejlesztési környezet. Ez a rövid útmutató az Eclipse-t használja. A 8-es vagy újabb verziójú Java Development Kit (JDK) használatához szükség van.
- Hozzon létre egy Event Hubs-névteret és egy eseményközpontot. Első lépésként az Azure Portal használatával hozzon létre egy Event Hubs típusú névteret, és szerezze be az alkalmazásnak az eseményközponttal való kommunikációhoz szükséges felügyeleti hitelesítő adatokat. Névtér és eseményközpont létrehozásához kövesse az ebben a cikkben ismertetett eljárást. Ezután szerezze be az Event Hubs-névtér kapcsolati sztring a következő cikk utasításait követve: Kapcsolati sztring lekérése. A rövid útmutató későbbi részében a kapcsolati sztring fogja használni.
Események küldése
Ez a szakasz bemutatja, hogyan hozhat létre Egy Java-alkalmazást az események eseményközpontba küldéséhez.
Hivatkozás hozzáadása Azure Event Hubs tárhoz
Először hozzon létre egy új Maven-projektet egy konzol-/rendszerhéj-alkalmazáshoz a kedvenc Java-fejlesztési környezetében. Frissítse a fájlt az pom.xml
alábbiak szerint. Az Event Hubs Java-ügyfélkódtára elérhető a Maven Central-adattárban.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.15.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
Megjegyzés
Frissítse a verziót a Maven-adattárban közzétett legújabb verzióra.
Az alkalmazás hitelesítése az Azure-ban
Ez a rövid útmutató két módszert mutat be a Azure Event Hubs való csatlakozásra: jelszó nélküli és kapcsolati sztring. Az első lehetőség bemutatja, hogyan használhatja a rendszerbiztonsági tagot az Azure Active Directoryban és a szerepköralapú hozzáférés-vezérlést (RBAC) egy Event Hubs-névtérhez való csatlakozáshoz. Nem kell aggódnia amiatt, hogy a kódban, konfigurációs fájlban vagy egy biztonságos tárolóban, például az Azure Key Vault tartalmaz rögzített kapcsolati sztringeket. A második lehetőség bemutatja, hogyan lehet kapcsolati sztring használni egy Event Hubs-névtérhez való csatlakozáshoz. Ha még nem ismerkedik az Azure-sal, könnyebben követheti a kapcsolati sztring lehetőséget. A jelszó nélküli beállítást a valós alkalmazásokban és éles környezetekben javasoljuk. További információ: Hitelesítés és engedélyezés. A jelszó nélküli hitelesítésről az áttekintési oldalon olvashat bővebben.
Szerepkörök hozzárendelése a Azure AD felhasználóhoz
Helyi fejlesztéskor győződjön meg arról, hogy a Azure Event Hubs csatlakozó felhasználói fiók rendelkezik a megfelelő engedélyekkel. Az üzenetek küldéséhez és fogadásához szüksége lesz az Azure Event Hubs Adattulajdonos szerepkörre. A szerepkör hozzárendeléséhez felhasználói hozzáférés-rendszergazdai szerepkörre vagy egy másik szerepkörre lesz szüksége, amely tartalmazza a Microsoft.Authorization/roleAssignments/write
műveletet. Azure RBAC-szerepköröket rendelhet egy felhasználóhoz a Azure Portal, az Azure CLI vagy Azure PowerShell használatával. További információ a szerepkör-hozzárendelések elérhető hatóköreiről a hatókör áttekintési oldalán.
Az alábbi példa hozzárendeli a szerepkört a Azure Event Hubs Data Owner
felhasználói fiókhoz, amely teljes hozzáférést biztosít Azure Event Hubs erőforrásokhoz. Egy valós forgatókönyvben kövesse a Minimális jogosultság elve alapján , hogy a felhasználók csak a biztonságosabb éles környezethez szükséges minimális engedélyeket adják meg a felhasználóknak.
Beépített Azure-szerepkörök Azure Event Hubs
A Azure Event Hubs esetében a névterek és az összes kapcsolódó erőforrás kezelése a Azure Portal és az Azure erőforrás-kezelési API-val már védett az Azure RBAC-modellel. Az Azure az alábbi beépített Azure-szerepköröket biztosítja az Event Hubs-névtérhez való hozzáférés engedélyezéséhez:
- Azure Event Hubs Adattulajdonos: Adathozzáférést tesz lehetővé az Event Hubs-névtérhez és annak entitásaihoz (üzenetsorokhoz, témakörökhöz, előfizetésekhez és szűrőkhöz)
- Azure Event Hubs adatküldő: Ezzel a szerepkörrel hozzáférést adhat a feladónak az Event Hubs-névtérhez és annak entitásaihoz.
- Azure Event Hubs adatfogadó: Ezzel a szerepkörrel hozzáférést adhat a fogadónak az Event Hubs-névtérhez és annak entitásaihoz.
Ha egyéni szerepkört szeretne létrehozni, tekintse meg az Event Hubs-műveletekhez szükséges jogosultságokat.
Fontos
A legtöbb esetben egy-két percig tart, amíg a szerepkör-hozzárendelés propagálódik az Azure-ban. Ritka esetekben akár nyolc percig is eltarthat. Ha hitelesítési hibákat kap a kód első futtatásakor, várjon néhány percet, és próbálkozzon újra.
Az Azure Portal keresse meg az Event Hubs-névteret a fő keresősávon vagy a bal oldali navigációs sávon.
Az áttekintési lapon válassza a hozzáférés-vezérlés (IAM) lehetőséget a bal oldali menüben.
A Hozzáférés-vezérlés (IAM) lapon válassza a Szerepkör-hozzárendelések lapot.
Válassza a + Hozzáadás lehetőséget a felső menüből, majd a Szerepkör-hozzárendelés hozzáadása lehetőséget az eredményül kapott legördülő menüből.
A keresőmezővel szűrheti az eredményeket a kívánt szerepkörre. Ebben a példában keresse meg
Azure Event Hubs Data Owner
és válassza ki az egyező eredményt. Ezután válassza a Tovább gombot.A Hozzáférés hozzárendelése területen válassza a Felhasználó, csoport vagy szolgáltatásnév lehetőséget, majd válassza a + Tagok kiválasztása lehetőséget.
A párbeszédpanelen keresse meg a Azure AD felhasználónevét (általában a user@domain e-mail-címét), majd válassza a párbeszédpanel alján található Kiválasztás lehetőséget.
Válassza a Véleményezés + hozzárendelés lehetőséget a végső lapra való ugráshoz, majd a Felülvizsgálat + hozzárendelés lehetőséget a folyamat befejezéséhez.
Kód írása az üzenetek eseményközpontba való küldésére
Adjon hozzá egy nevű Sender
osztályt, és adja hozzá a következő kódot az osztályhoz:
Fontos
- Frissítsen
<NAMESPACE NAME>
az Event Hubs-névtér nevével. - Frissítsen
<EVENT HUB NAME>
az eseményközpont nevével.
package ehubquickstart;
import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;
import com.azure.identity.*;
public class SenderAAD {
// replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
// Example: private static final String namespaceName = "contosons.servicebus.windows.net";
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
// Replace <EVENT HUB NAME> with the name of your event hug.
// Example: private static final String eventHubName = "ordersehub";
private static final String eventHubName = "<EVENT HUB NAME>";
public static void main(String[] args) {
publishEvents();
}
/**
* Code sample for publishing events.
* @throws IllegalArgumentException if the EventData is bigger than the max batch size.
*/
public static void publishEvents() {
// create a token using the default Azure credential
DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
.authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
.build();
// create a producer client
EventHubProducerClient producer = new EventHubClientBuilder()
.fullyQualifiedNamespace(namespaceName)
.eventHubName(eventHubName)
.credential(credential)
.buildProducerClient();
// sample events in an array
List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
for (EventData eventData : allEvents) {
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
}
Hozza létre a programot, és győződjön meg arról, hogy nincsenek hibák. Ezt a programot a fogadóprogram futtatása után fogja futtatni.
Események fogadása
Az oktatóanyagban szereplő kód a GitHubon található EventProcessorClient-mintán alapul, amelyet megvizsgálva megtekintheti a teljes működő alkalmazást.
Figyelmeztetés
Ha ezt a kódot az Azure Stack Hubon futtatja, futásidejű hibákat fog tapasztalni, hacsak nem egy adott Storage API-verziót céloz meg. Ennek az az oka, hogy az Event Hubs SDK az Azure-ban elérhető legújabb Elérhető Azure Storage API-t használja, amely nem feltétlenül érhető el az Azure Stack Hub platformon. Az Azure Stack Hub a Azure Blob Storage SDK egy másik verzióját is támogathatja, mint az Azure-ban általában elérhető. Ha ellenőrzőpont-tárolóként Azure Blob Storage használ, ellenőrizze az Azure Stack Hub buildjének támogatott Azure Storage API-verzióját, és célként adja meg azt a verziót a kódban.
Ha például az Azure Stack Hub 2005-ös verzióján fut, a Storage szolgáltatás legmagasabb elérhető verziója a 2019-02-02-es verzió. Alapértelmezés szerint az Event Hubs SDK ügyfélkódtár az Azure-ban elérhető legmagasabb elérhető verziót használja (2019-07-07 az SDK kiadásának időpontjában). Ebben az esetben az ebben a szakaszban ismertetett lépéseken kívül kóddal is meg kell céloznia a Storage-szolgáltatás API 2019-02-02-es verzióját. Egy adott Storage API-verzió megcélzására vonatkozó példaért tekintse meg ezt a mintát a GitHubon.
Azure Storage és blobtároló létrehozása
Ebben a rövid útmutatóban az Azure Storage -t (pontosabban a Blob Storage-t) használja ellenőrzőpont-tárolóként. Az ellenőrzőpont-beállítás olyan folyamat, amellyel egy eseményfeldolgozó megjelöli vagy véglegesíti az utolsó sikeresen feldolgozott esemény pozícióját egy partíción belül. Az ellenőrzőpontok megjelölése általában az eseményeket feldolgozó függvényen belül történik. Az ellenőrzőpontokkal kapcsolatos további információkért lásd: Eseményfeldolgozó.
Kövesse az alábbi lépéseket egy Azure Storage-fiók létrehozásához.
- Azure Storage-fiók létrehozása
- Blobtároló létrehozása
- Hitelesítés a blobtárolóban
Helyi fejlesztéskor győződjön meg arról, hogy a blobadatokhoz hozzáférő felhasználói fiók rendelkezik a megfelelő engedélyekkel. A blobadatok olvasásához és írásához tárolóblobadatok közreműködője szükséges. A szerepkör hozzárendeléséhez hozzá kell rendelnie a Felhasználói hozzáférés rendszergazdája szerepkört, vagy egy másik szerepkört, amely tartalmazza a Microsoft.Authorization/roleAssignments/write műveletet. Azure RBAC-szerepköröket rendelhet egy felhasználóhoz a Azure Portal, az Azure CLI vagy Azure PowerShell használatával. A szerepkör-hozzárendelések elérhető hatóköreiről a hatókör áttekintési oldalán talál további információt.
Ebben a forgatókönyvben engedélyeket rendel a felhasználói fiókjához, amely a tárfiókra terjed ki, hogy kövesse a minimális jogosultság elvét. Ez a gyakorlat csak a minimálisan szükséges engedélyeket biztosítja a felhasználóknak, és biztonságosabb éles környezeteket hoz létre.
Az alábbi példa a Storage-blobadatok közreműködője szerepkört rendeli hozzá a felhasználói fiókhoz, amely olvasási és írási hozzáférést biztosít a tárfiók blobadataihoz.
Fontos
A szerepkör-hozzárendelés propagálása a legtöbb esetben egy-két percet vesz igénybe az Azure-ban, de ritka esetekben akár nyolc percet is igénybe vehet. Ha hitelesítési hibákat kap a kód első futtatásakor, várjon néhány percet, és próbálkozzon újra.
A Azure Portal keresse meg a tárfiókot a fő keresősávon vagy a bal oldali navigációs sávon.
A tárfiók áttekintési oldalán válassza a hozzáférés-vezérlés (IAM) lehetőséget a bal oldali menüben.
A Hozzáférés-vezérlés (IAM) lapon válassza a Szerepkör-hozzárendelések lapot.
Válassza a + Hozzáadás lehetőséget a felső menüből, majd a Szerepkör-hozzárendelés hozzáadása lehetőséget az eredményül kapott legördülő menüből.
A keresőmezővel szűrheti az eredményeket a kívánt szerepkörre. Ebben a példában keresse meg a Storage-blobadatok közreműködője kifejezést, és válassza ki az egyező eredményt, majd kattintson a Tovább gombra.
A Hozzáférés hozzárendelése területen válassza a Felhasználó, csoport vagy szolgáltatásnév lehetőséget, majd válassza a + Tagok kiválasztása lehetőséget.
A párbeszédpanelen keresse meg a Azure AD felhasználónevét (általában a user@domain e-mail-címét), majd válassza a párbeszédpanel alján található Kiválasztás lehetőséget.
Válassza a Véleményezés + hozzárendelés lehetőséget a végső lapra való ugráshoz, majd a Felülvizsgálat + hozzárendelés lehetőséget a folyamat befejezéséhez.
Event Hubs-kódtárak hozzáadása a Java-projekthez
Adja hozzá a következő függőségeket a pom.xml fájlhoz.
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.15.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
Adja hozzá a következő
import
utasításokat a Java-fájl tetején.import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.*; import com.azure.storage.blob.*; import java.util.function.Consumer; import com.azure.identity.*;
Hozzon létre egy nevű osztályt
Receiver
, és adja hozzá a következő sztringváltozókat az osztályhoz. Cserélje le a helyőrzőket a megfelelő értékekre.Fontos
Cserélje le a helyőrzőket a megfelelő értékekre.
<NAMESPACE NAME>
az Event Hubs-névtér nevére.<EVENT HUB NAME>
az eseményközpont nevével a névtérben.
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net"; private static final String eventHubName = "<EVENT HUB NAME>";
Adja hozzá a következő
main
metódust az osztályhoz.Fontos
Cserélje le a helyőrzőket a megfelelő értékekre.
<STORAGE ACCOUNT NAME>
az Azure Storage-fiók nevével.<CONTAINER NAME>
a blobtároló nevével a tárfiókban
// create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD) .build(); // Create a blob container client that you use later to build an event processor client to receive and process events BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .credential(credential) .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net") .containerName("<CONTAINER NAME>") .buildAsyncClient(); // Create an event processor client to receive and process events and errors. EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder() .fullyQualifiedNamespace(namespaceName) .eventHubName(eventHubName) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) .credential(credential) .buildEventProcessorClient(); System.out.println("Starting event processor"); eventProcessorClient.start(); System.out.println("Press enter to stop."); System.in.read(); System.out.println("Stopping event processor"); eventProcessorClient.stop(); System.out.println("Event processor stopped."); System.out.println("Exiting process");
Adja hozzá az eseményeket és hibákat feldolgozó két segédmet
PARTITION_PROCESSOR
( ésERROR_HANDLER
) aReceiver
osztályhoz.public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> { System.out.printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); };
Hozza létre a programot, és győződjön meg arról, hogy nincsenek hibák.
Az alkalmazások futtatása
Először futtassa a Fogadó alkalmazást.
Ezután futtassa a Feladó alkalmazást.
A Fogadó alkalmazás ablakban ellenőrizze, hogy látja-e a Feladó alkalmazás által közzétett eseményeket.
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar
Az alkalmazás leállításához nyomja le az ENTER billentyűt a fogadó alkalmazásablakban.
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar Stopping event processor Event processor stopped. Exiting process
Következő lépések
Tekintse meg a következő mintákat a GitHubon: