Använda Java för att skicka händelser till eller ta emot händelser från Azure Event Hubs

Den här snabbstarten visar hur du skickar händelser till och tar emot händelser från en händelsehubb med java-paketet azure-messaging-eventhubs .

Dricks

Om du arbetar med Azure Event Hubs-resurser i ett Spring-program rekommenderar vi att du överväger Spring Cloud Azure som ett alternativ. Spring Cloud Azure är ett projekt med öppen källkod som ger sömlös Spring-integrering med Azure-tjänster. Mer information om Spring Cloud Azure och ett exempel med eventhubbar finns i Spring Cloud Stream med Azure Event Hubs.

Förutsättningar

Om du inte har använt Azure Event Hubs tidigare kan du läsa Översikt över Event Hubs innan du gör den här snabbstarten.

För att slutföra den här snabbstarten, behöver du följande förhandskrav:

  • Microsoft Azure-prenumeration. Om du vill använda Azure-tjänster, inklusive Azure Event Hubs, behöver du en prenumeration. Om du inte har ett befintligt Azure-konto kan du registrera dig för en kostnadsfri utvärderingsversion eller använda dina MSDN-prenumerantförmåner när du skapar ett konto.
  • En Java-utvecklingsmiljö. Den här snabbstarten använder Eclipse. Java Development Kit (JDK) med version 8 eller senare krävs.
  • Skapa ett Event Hubs-namnområde och en händelsehubb. Det första steget är att använda Azure Portal till att skapa ett namnområde av typen Event Hubs och hämta de autentiseringsuppgifter för hantering som programmet behöver för att kommunicera med händelsehubben. Om du behöver skapa ett namnområde och en händelsehubb följer du anvisningarna i den här artikeln. Hämta sedan anslutningssträng för Event Hubs-namnområdet genom att följa anvisningarna i artikeln: Hämta anslutningssträng. Du använder anslutningssträng senare i den här snabbstarten.

Skicka händelser

Det här avsnittet visar hur du skapar ett Java-program för att skicka händelser till en händelsehubb.

Lägga till referens till Azure Event Hubs-biblioteket

Skapa först ett nytt Maven-projekt för ett konsol-/shell-program i din java-favoritmiljö. Uppdatera filen på pom.xml följande sätt. Java-klientbiblioteket för Event Hubs är tillgängligt på den centrala Maven-lagringsplatsen.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

Kommentar

Uppdatera versionen till den senaste versionen som publicerats till Maven-lagringsplatsen.

Autentisera appen till Azure

Den här snabbstarten visar två sätt att ansluta till Azure Event Hubs: lösenordslösa och anslutningssträng. Det första alternativet visar hur du använder ditt säkerhetsobjekt i Microsoft Entra-ID och rollbaserad åtkomstkontroll (RBAC) för att ansluta till ett Event Hubs-namnområde. Du behöver inte bekymra dig om att ha hårdkodade anslutningssträng i koden eller i en konfigurationsfil eller i en säker lagring som Azure Key Vault. Det andra alternativet visar hur du använder en anslutningssträng för att ansluta till ett Event Hubs-namnområde. Om du inte har använt Azure tidigare kan det vara lättare att följa anslutningssträng alternativet. Vi rekommenderar att du använder det lösenordslösa alternativet i verkliga program och produktionsmiljöer. Mer information finns i Autentisering och auktorisering. Du kan också läsa mer om lösenordslös autentisering på översiktssidan.

Tilldela roller till din Microsoft Entra-användare

När du utvecklar lokalt kontrollerar du att användarkontot som ansluter till Azure Event Hubs har rätt behörigheter. Du behöver rollen Azure Event Hubs-dataägare för att kunna skicka och ta emot meddelanden. Om du vill tilldela dig själv den här rollen behöver du rollen Administratör för användaråtkomst eller en annan roll som innehåller åtgärden Microsoft.Authorization/roleAssignments/write . Du kan tilldela Azure RBAC-roller till en användare med hjälp av Azure-portalen, Azure CLI eller Azure PowerShell. Läs mer om tillgängliga omfång för rolltilldelningar på översiktssidan för omfång .

I följande exempel tilldelas Azure Event Hubs Data Owner rollen till ditt användarkonto, vilket ger fullständig åtkomst till Azure Event Hubs-resurser. I ett verkligt scenario följer du principen om lägsta behörighet för att ge användarna endast de minsta behörigheter som krävs för en säkrare produktionsmiljö.

Inbyggda Azure-roller för Azure Event Hubs

För Azure Event Hubs skyddas redan hanteringen av namnområden och alla relaterade resurser via Azure-portalen och AZURE-resurshanterings-API:et med hjälp av Azure RBAC-modellen. Azure tillhandahåller de inbyggda Azure-rollerna nedan för att auktorisera åtkomst till ett Event Hubs-namnområde:

Om du vill skapa en anpassad roll läser du Rättigheter som krävs för Event Hubs-åtgärder.

Viktigt!

I de flesta fall tar det en minut eller två innan rolltilldelningen sprids i Azure. I sällsynta fall kan det ta upp till åtta minuter. Om du får autentiseringsfel när du först kör koden väntar du en stund och försöker igen.

  1. Leta upp event hubs-namnområdet i Azure-portalen med hjälp av huvudsökfältet eller det vänstra navigeringsfältet.

  2. På översiktssidan väljer du Åtkomstkontroll (IAM) på den vänstra menyn.

  3. På sidan Åtkomstkontroll (IAM) väljer du fliken Rolltilldelningar .

  4. Välj + Lägg till på den översta menyn och sedan Lägg till rolltilldelning från den resulterande nedrullningsbara menyn.

    A screenshot showing how to assign a role.

  5. Använd sökrutan för att filtrera resultatet till önskad roll. I det här exemplet söker Azure Event Hubs Data Owner du efter och väljer matchande resultat. Välj sedan Nästa.

  6. Under Tilldela åtkomst till väljer du Användare, grupp eller tjänstens huvudnamn och sedan + Välj medlemmar.

  7. I dialogrutan söker du efter ditt Microsoft Entra-användarnamn (vanligtvis din user@domain e-postadress) och väljer sedan Välj längst ned i dialogrutan.

  8. Välj Granska + tilldela för att gå till den sista sidan och sedan Granska + tilldela igen för att slutföra processen.

Skriva kod för att skicka meddelanden till händelsehubben

Lägg till en klass med namnet Senderoch lägg till följande kod i klassen:

Viktigt!

  • Uppdatera <NAMESPACE NAME> med namnet på ditt Event Hubs-namnområde.
  • Uppdatera <EVENT HUB NAME> med namnet på din händelsehubb.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Skapa programmet och se till att det inte finns några fel. Du kör det här programmet när du har kört mottagarprogrammet.

Ta emot händelser

Koden i den här självstudien baseras på EventProcessorClient-exemplet på GitHub, som du kan undersöka för att se det fullständiga fungerande programmet.

Följ dessa rekommendationer när du använder Azure Blob Storage som kontrollpunktslager:

  • Använd en separat container för varje konsumentgrupp. Du kan använda samma lagringskonto, men använda en container per grupp.
  • Använd inte containern för något annat och använd inte lagringskontot för något annat.
  • Lagringskontot ska finnas i samma region som det distribuerade programmet finns i. Om programmet är lokalt kan du försöka välja den region som är närmast.

På sidan Lagringskonto i Azure-portalen i avsnittet Blob Service kontrollerar du att följande inställningar är inaktiverade.

  • Hierarkisk namnrymd
  • Mjuk borttagning av blob
  • Versionshantering

Skapa en Azure Storage och en blobcontainer

I den här snabbstarten använder du Azure Storage (särskilt Blob Storage) som kontrollpunktsarkiv. Kontrollpunkter är en process där en händelseprocessor markerar eller checkar in positionen för den senast bearbetade händelsen i en partition. En kontrollpunkt markeras vanligtvis i den funktion som bearbetar händelserna. Mer information om kontrollpunkter finns i Händelseprocessor.

Följ de här stegen för att skapa ett Azure Storage-konto.

  1. Skapa ett Azure Storage-konto
  2. Skapa en blobcontainer
  3. Autentisera till blobcontainern

När du utvecklar lokalt kontrollerar du att användarkontot som har åtkomst till blobdata har rätt behörigheter. Du behöver Storage Blob Data-deltagare för att läsa och skriva blobdata. Om du vill tilldela dig själv den här rollen måste du tilldelas rollen Administratör för användaråtkomst eller en annan roll som innehåller åtgärden Microsoft.Authorization/roleAssignments/write . Du kan tilldela Azure RBAC-roller till en användare med hjälp av Azure-portalen, Azure CLI eller Azure PowerShell. Du kan lära dig mer om tillgängliga omfång för rolltilldelningar på översiktssidan för omfång .

I det här scenariot tilldelar du behörigheter till ditt användarkonto, begränsat till lagringskontot, för att följa principen om lägsta behörighet. Den här metoden ger användarna endast de minsta behörigheter som krävs och skapar säkrare produktionsmiljöer.

I följande exempel tilldelas rollen Storage Blob Data Contributor till ditt användarkonto, vilket ger både läs- och skrivåtkomst till blobdata i ditt lagringskonto.

Viktigt!

I de flesta fall tar det en minut eller två för rolltilldelningen att spridas i Azure, men i sällsynta fall kan det ta upp till åtta minuter. Om du får autentiseringsfel när du först kör koden väntar du en stund och försöker igen.

  1. Leta upp ditt lagringskonto i Azure-portalen med hjälp av huvudsökfältet eller det vänstra navigeringsfältet.

  2. På översiktssidan för lagringskontot väljer du Åtkomstkontroll (IAM) på den vänstra menyn.

  3. På sidan Åtkomstkontroll (IAM) väljer du fliken Rolltilldelningar .

  4. Välj + Lägg till på den översta menyn och sedan Lägg till rolltilldelning från den resulterande nedrullningsbara menyn.

    A screenshot showing how to assign a storage account role.

  5. Använd sökrutan för att filtrera resultatet till önskad roll. I det här exemplet söker du efter Storage Blob Data Contributor och väljer matchande resultat och väljer sedan Nästa.

  6. Under Tilldela åtkomst till väljer du Användare, grupp eller tjänstens huvudnamn och sedan + Välj medlemmar.

  7. I dialogrutan söker du efter ditt Microsoft Entra-användarnamn (vanligtvis din user@domain e-postadress) och väljer sedan Välj längst ned i dialogrutan.

  8. Välj Granska + tilldela för att gå till den sista sidan och sedan Granska + tilldela igen för att slutföra processen.

Lägga till Event Hubs-bibliotek i ditt Java-projekt

Lägg till följande beroenden i filen pom.xml.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Lägg till följande import instruktioner överst i Java-filen.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Skapa en klass med namnet Receiveroch lägg till följande strängvariabler i klassen. Ersätt platshållarna med rätt värden.

    Viktigt!

    Ersätt platshållarna med rätt värden.

    • <NAMESPACE NAME> med namnet på Event Hubs-namnrymden.
    • <EVENT HUB NAME> med namnet på händelsehubben i namnområdet.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Lägg till följande main metod i klassen.

    Viktigt!

    Ersätt platshållarna med rätt värden.

    • <STORAGE ACCOUNT NAME> med namnet på ditt Azure Storage-konto.
    • <CONTAINER NAME> med namnet på blobcontainern i lagringskontot
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Lägg till de två hjälpmetoderna (PARTITION_PROCESSOR och ERROR_HANDLER) som bearbetar händelser och fel i Receiver klassen.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Skapa programmet och se till att det inte finns några fel.

Kör programmen

  1. Kör mottagarprogrammet först.

  2. Kör sedan avsändarprogrammet.

  3. I fönstret Mottagarprogram bekräftar du att du ser de händelser som har publicerats av avsändarprogrammet.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Tryck på RETUR i mottagarprogrammets fönster för att stoppa programmet.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Nästa steg

Se följande exempel på GitHub: