Események küldése és fogadása az Azure Event Hubson a Java használatával

Ez a rövid útmutató bemutatja, hogyan küldhet eseményeket egy eseményközpontba, és hogyan fogadhat eseményeket az azure-messaging-eventhubs Java-csomag használatával.

Tipp.

Ha az Azure Event Hubs-erőforrásokkal dolgozik egy Spring-alkalmazásban, javasoljuk, hogy fontolja meg a Spring Cloud Azure-t alternatívaként. A Spring Cloud Azure egy nyílt forráskódú projekt, amely zökkenőmentes Spring-integrációt biztosít az Azure-szolgáltatásokkal. Ha többet szeretne megtudni a Spring Cloud Azure-ról, és az Event Hubs használatával szeretne egy példát látni, tekintse meg a Spring Cloud Streamet az Azure Event Hubsszal.

Előfeltételek

Ha még nem ismerkedik az Azure Event Hubs szolgáltatásokkal, a rövid útmutató előtt tekintse meg az Event Hubs áttekintését .

A rövid útmutató elvégzéséhez a következő előfeltételekre van szüksége:

  • Microsoft Azure-előfizetés. Az Azure-szolgáltatások, köztük az Azure Event Hubs használatához előfizetésre van szükség. Ha nem rendelkezik meglévő Azure-fiókkal, regisztrálhat egy ingyenes próbaverzióra, vagy használhatja az MSDN-előfizetői előnyöket a fiók létrehozásakor.
  • Java fejlesztői környezet. Ez a rövid útmutató az Eclipse-t használja. A 8-es vagy újabb verziójú Java Development Kit (JDK) használatához szükség van.
  • Hozzon létre egy Event Hubs-névteret és egy eseményközpontot. Első lépésként az Azure Portalon hozzon létre egy Event Hubs típusú névteret, és szerezze be az alkalmazása és az eseményközpont közötti kommunikációhoz szükséges felügyeleti hitelesítő adatokat. Névtér és eseményközpont létrehozásához kövesse az ebben a cikkben ismertetett eljárást. Ezután szerezze be az Event Hubs-névtér kapcsolati sztring a következő cikk utasításait követve: Kapcsolati sztring lekérése. A rövid útmutató későbbi részében a kapcsolati sztring használhatja.

Események küldése

Ez a szakasz bemutatja, hogyan hozhat létre Java-alkalmazást események eseményközpontba küldéséhez.

Hivatkozás hozzáadása az Azure Event Hubs-kódtárhoz

Először hozzon létre egy új Maven-projektet egy konzol-/rendszerhéj-alkalmazáshoz a kedvenc Java fejlesztői környezetében. Frissítse a fájlt az pom.xml alábbiak szerint. Az Event Hubs Java-ügyfélkódtára a Maven Central-adattárban érhető el.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

Feljegyzés

Frissítse a verziót a Maven-adattárban közzétett legújabb verzióra.

Az alkalmazás hitelesítése az Azure-ban

Ez a rövid útmutató az Azure Event Hubshoz való csatlakozás két módját mutatja be: jelszó nélküli és kapcsolati sztring. Az első lehetőség bemutatja, hogyan csatlakozhat egy Event Hubs-névtérhez a Microsoft Entra-azonosítóban és a szerepköralapú hozzáférés-vezérlésben (RBAC). Nem kell aggódnia amiatt, hogy a kódban, a konfigurációs fájlban vagy egy biztonságos tárolóban, például az Azure Key Vaultban található kapcsolati sztring. A második lehetőség bemutatja, hogyan lehet kapcsolati sztring csatlakozni egy Event Hubs-névtérhez. Ha még nem ismerkedik az Azure-sal, könnyebben követheti a kapcsolati sztring lehetőséget. A jelszó nélküli beállítást a valós alkalmazásokban és éles környezetekben javasoljuk. További információ: Hitelesítés és engedélyezés. A jelszó nélküli hitelesítésről az áttekintési oldalon olvashat bővebben.

Szerepkörök hozzárendelése a Microsoft Entra-felhasználóhoz

Helyi fejlesztéskor győződjön meg arról, hogy az Azure Event Hubshoz csatlakozó felhasználói fiók rendelkezik a megfelelő engedélyekkel. Az üzenetek küldéséhez és fogadásához szüksége lesz az Azure Event Hubs adattulajdonosi szerepkörére. A szerepkör hozzárendeléséhez szüksége lesz a Felhasználói hozzáférés Rendszergazda istrator szerepkörre, vagy egy másik szerepkörre, amely tartalmazza a Microsoft.Authorization/roleAssignments/write műveletet. Azure RBAC-szerepköröket rendelhet egy felhasználóhoz az Azure Portal, az Azure CLI vagy az Azure PowerShell használatával. További információ a szerepkör-hozzárendelések elérhető hatóköreiről a hatókör áttekintési oldalán.

Az alábbi példa hozzárendeli a szerepkört a Azure Event Hubs Data Owner felhasználói fiókjához, amely teljes hozzáférést biztosít az Azure Event Hubs-erőforrásokhoz. Egy valós forgatókönyvben kövesse a Minimális jogosultság elvét, hogy a felhasználók csak a biztonságosabb éles környezethez szükséges minimális engedélyeket adják meg a felhasználóknak.

Azure beépített szerepkörök az Azure Event Hubshoz

Az Azure Event Hubs esetében a névterek és az összes kapcsolódó erőforrás kezelése az Azure Portalon és az Azure Resource Management API-val már védett az Azure RBAC-modellel. Az Azure az alábbi beépített Azure-szerepköröket biztosítja az Event Hubs-névtérhez való hozzáférés engedélyezéséhez:

  • Azure Event Hubs-adattulajdonos: Adathozzáférést tesz lehetővé az Event Hubs-névtérhez és annak entitásaihoz (üzenetsorokhoz, témakörökhöz, előfizetésekhez és szűrőkhöz)
  • Azure Event Hubs-adatküldő: Ezzel a szerepkörrel hozzáférést adhat a feladónak az Event Hubs-névtérhez és annak entitásaihoz.
  • Azure Event Hubs-adat fogadó: Ezzel a szerepkörrel hozzáférést adhat a fogadónak az Event Hubs-névtérhez és annak entitásaihoz.

Ha egyéni szerepkört szeretne létrehozni, tekintse meg az Event Hubs-műveletekhez szükséges jogosultságokat.

Fontos

A legtöbb esetben egy-két percig tart, amíg a szerepkör-hozzárendelés propagálása az Azure-ban megtörténik. Ritkán akár nyolc percig is eltarthat. Ha hitelesítési hibákat kap a kód első futtatásakor, várjon néhány percet, és próbálkozzon újra.

  1. Az Azure Portalon keresse meg az Event Hubs-névteret a fő keresősáv vagy a bal oldali navigációs sáv használatával.

  2. Az áttekintési lapon válassza a Hozzáférés-vezérlés (IAM) lehetőséget a bal oldali menüben.

  3. A Hozzáférés-vezérlés (IAM) lapon válassza a Szerepkör-hozzárendelések lapot.

  4. Válassza a +Hozzáadás lehetőséget a felső menüből, majd a szerepkör-hozzárendelés hozzáadása lehetőséget az eredményül kapott legördülő menüből.

    A screenshot showing how to assign a role.

  5. A keresőmezővel szűrheti az eredményeket a kívánt szerepkörre. Ebben a példában keresse meg Azure Event Hubs Data Owner és válassza ki az egyező eredményt. Ezután válassza a Tovább gombot.

  6. A Hozzáférés hozzárendelése területen válassza a Felhasználó, csoport vagy szolgáltatásnév lehetőséget, majd válassza a + Tagok kijelölése lehetőséget.

  7. A párbeszédpanelen keresse meg a Microsoft Entra-felhasználónevet (általában a user@domain e-mail-címét), majd válassza a Párbeszédpanel alján található Kiválasztás lehetőséget.

  8. Válassza a Véleményezés + hozzárendelés lehetőséget a végső lapra való ugráshoz, majd a folyamat befejezéséhez a Véleményezés + hozzárendelés lehetőséget.

Kód írása az üzenetek eseményközpontba való küldésére

Adjon hozzá egy osztályt, Senderés adja hozzá a következő kódot az osztályhoz:

Fontos

  • Frissítsen <NAMESPACE NAME> az Event Hubs-névtér nevével.
  • Frissítsen <EVENT HUB NAME> az eseményközpont nevével.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Hozza létre a programot, és győződjön meg arról, hogy nincsenek hibák. Ezt a programot a fogadóprogram futtatása után fogja futtatni.

Események fogadása

Az oktatóanyagban szereplő kód a GitHub EventProcessorClient-mintáján alapul, amelyet megvizsgálva megtekintheti a teljes működő alkalmazást.

Kövesse az alábbi javaslatokat az Azure Blob Storage ellenőrzőpont-tárolóként való használatakor:

  • Minden fogyasztói csoporthoz használjon külön tárolót. Használhatja ugyanazt a tárfiókot, de csoportonként egy tárolót.
  • Ne használja a tárolót semmi máshoz, és ne használja a tárfiókot semmi máshoz.
  • A tárfióknak ugyanabban a régióban kell lennie, amelyben az üzembe helyezett alkalmazás található. Ha az alkalmazás helyszíni, próbálja meg kiválasztani a lehető legközelebbi régiót.

Az Azure Portal Tárfiók lapján, a Blob szolgáltatás szakaszában győződjön meg arról, hogy a következő beállítások le vannak tiltva.

  • Hierarchikus névtér
  • Blob áltörlése
  • Verziókezelés

Azure Storage és blobtároló létrehozása

Ebben a rövid útmutatóban az Azure Storage-t (pontosabban a Blob Storage-t) használja ellenőrzőpont-tárolóként. A checkpointing egy folyamat, amellyel egy eseményfeldolgozó megjelöli vagy véglegesíti az utolsó sikeresen feldolgozott esemény pozícióját egy partíción belül. Az ellenőrzőpont megjelölése általában az eseményeket feldolgozó függvényen belül történik. Az ellenőrzőpontokkal kapcsolatos további információkért tekintse meg az Eseményfeldolgozót.

Kövesse az alábbi lépéseket egy Azure Storage-fiók létrehozásához.

  1. Azure Storage-fiók létrehozása
  2. Blobtároló létrehozása
  3. Hitelesítés a blobtárolóban

Helyi fejlesztéskor győződjön meg arról, hogy a blobadatokhoz hozzáférő felhasználói fiók rendelkezik a megfelelő engedélyekkel. A blobadatok olvasásához és írásához tárolóblobadatok közreműködője szükséges. A szerepkör hozzárendeléséhez hozzá kell rendelnie a Felhasználói hozzáférés Rendszergazda istrator szerepkört, vagy egy másik szerepkört, amely tartalmazza a Microsoft.Authorization/roleAssignments/write műveletet. Azure RBAC-szerepköröket rendelhet egy felhasználóhoz az Azure Portal, az Azure CLI vagy az Azure PowerShell használatával. A szerepkör-hozzárendelések elérhető hatóköreiről a hatókör áttekintési oldalán tudhat meg többet.

Ebben a forgatókönyvben engedélyeket rendel hozzá a felhasználói fiókjához, amely a tárfiókra terjed ki, hogy kövesse a minimális jogosultság elvét. Ez a gyakorlat csak a minimálisan szükséges engedélyeket biztosítja a felhasználóknak, és biztonságosabb éles környezeteket hoz létre.

Az alábbi példa a Storage Blob Data Contributor szerepkört rendeli hozzá a felhasználói fiókjához, amely olvasási és írási hozzáférést biztosít a tárfiók blobadataihoz.

Fontos

A szerepkör-hozzárendelés propagálása a legtöbb esetben egy-két percet vesz igénybe az Azure-ban, de ritkán akár nyolc percet is igénybe vehet. Ha hitelesítési hibákat kap a kód első futtatásakor, várjon néhány percet, és próbálkozzon újra.

  1. Az Azure Portalon keresse meg a tárfiókot a fő keresősávon vagy a bal oldali navigációs sávon.

  2. A tárfiók áttekintési lapján válassza a Hozzáférés-vezérlés (IAM) lehetőséget a bal oldali menüben.

  3. A Hozzáférés-vezérlés (IAM) lapon válassza a Szerepkör-hozzárendelések lapot.

  4. Válassza a +Hozzáadás lehetőséget a felső menüből, majd a szerepkör-hozzárendelés hozzáadása lehetőséget az eredményül kapott legördülő menüből.

    A screenshot showing how to assign a storage account role.

  5. A keresőmezővel szűrheti az eredményeket a kívánt szerepkörre. Ebben a példában keresse meg a Storage Blob-adatszolgáltatót, és válassza ki a megfelelő eredményt, majd válassza a Tovább gombot.

  6. A Hozzáférés hozzárendelése területen válassza a Felhasználó, csoport vagy szolgáltatásnév lehetőséget, majd válassza a + Tagok kijelölése lehetőséget.

  7. A párbeszédpanelen keresse meg a Microsoft Entra-felhasználónevet (általában a user@domain e-mail-címét), majd válassza a Párbeszédpanel alján található Kiválasztás lehetőséget.

  8. Válassza a Véleményezés + hozzárendelés lehetőséget a végső lapra való ugráshoz, majd a folyamat befejezéséhez a Véleményezés + hozzárendelés lehetőséget.

Event Hubs-kódtárak hozzáadása a Java-projekthez

Adja hozzá a következő függőségeket a pom.xml fájlhoz.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Adja hozzá a következő import utasításokat a Java-fájl tetején.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Hozzon létre egy osztályt, Receiverés adja hozzá a következő sztringváltozókat az osztályhoz. Cserélje le a helyőrzőket a megfelelő értékekre.

    Fontos

    Cserélje le a helyőrzőket a megfelelő értékekre.

    • <NAMESPACE NAME> az Event Hubs-névtér nevével.
    • <EVENT HUB NAME> az eseményközpont nevével a névtérben.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Adja hozzá a következő main metódust az osztályhoz.

    Fontos

    Cserélje le a helyőrzőket a megfelelő értékekre.

    • <STORAGE ACCOUNT NAME> az Azure Storage-fiók nevével.
    • <CONTAINER NAME> a blobtároló nevével a tárfiókban
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Adja hozzá az eseményeket és hibákat feldolgozó két segédmetórát (PARTITION_PROCESSOR és ERROR_HANDLER) az Receiver osztályhoz.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Hozza létre a programot, és győződjön meg arról, hogy nincsenek hibák.

Az alkalmazások futtatása

  1. Először futtassa a Fogadó alkalmazást.

  2. Ezután futtassa a Feladó alkalmazást.

  3. A Fogadó alkalmazás ablakban győződjön meg arról, hogy a Küldő alkalmazás által közzétett eseményeket látja.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Az alkalmazás leállításához nyomja le az ENTER billentyűt a fogadóalkalmazás ablakában.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Következő lépések

Tekintse meg a következő mintákat a GitHubon: