Použití Javy k odesílání událostí do služby Azure Event Hubs (azure-messaging-eventhubs)

V tomto rychlém startu se dozvíte, jak odesílat události do centra událostí a přijímat je z centra událostí pomocí balíčku java azure-messaging-eventhubs .

Požadavky

Pokud s Azure Event Hubs teprve začínáte, přečtěte si před provedením tohoto rychlého startu přehled služby Event Hubs.

K dokončení tohoto rychlého startu potřebujete následující požadavky:

  • Microsoft předplatného Azure. Pokud chcete používat služby Azure, včetně Azure Event Hubs, potřebujete předplatné. Pokud ještě nemáte účet Azure, můžete si zaregistrovat bezplatnou zkušební verzi nebo využít výhody předplatitele MSDN při vytváření účtu.
  • Vývojové prostředí Java. V tomto rychlém startu se používá Eclipse. Vyžaduje se sada Java Development Kit (JDK) verze 8 nebo vyšší.
  • Vytvořte obor názvů služby Event Hubs a centrum událostí. Prvním krokem je použití Azure Portal k vytvoření oboru názvů typu Event Hubs a získání přihlašovacích údajů pro správu, které vaše aplikace potřebuje ke komunikaci s centrem událostí. Pokud chcete vytvořit obor názvů a centrum událostí, postupujte podle pokynů v tomto článku. Pak získejte připojovací řetězec pro obor názvů služby Event Hubs podle pokynů v článku Získání připojovacího řetězce. Připojovací řetězec použijete později v tomto rychlém startu.

Odesílání událostí

V této části se dozvíte, jak vytvořit aplikaci v Javě pro odesílání událostí do centra událostí.

Přidání odkazu na knihovnu Azure Event Hubs

Nejprve vytvořte nový projekt Maven pro konzolovou nebo shellovou aplikaci ve svém oblíbeném vývojovém prostředí Java. pom.xml Aktualizujte soubor pomocí následující závislosti. Klientská knihovna Java pro službu Event Hubs je k dispozici v centrálním úložišti Maven.

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.7.0</version>
</dependency>

Poznámka

Aktualizujte verzi na nejnovější verzi publikovanou v úložišti Maven.

Napsání kódu pro odesílání zpráv do centra událostí

Přidejte třídu s názvem Sendera do třídy přidejte následující kód:

Důležité

Aktualizujte <Event Hubs namespace connection string> připojovací řetězec k oboru názvů služby Event Hubs. Aktualizujte <Event hub name> na název vašeho centra událostí v oboru názvů .

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

public class Sender {
    private static final String connectionString = "<Event Hubs namespace connection string>";
    private static final String eventHubName = "<Event hub name>";

    public static void main(String[] args) {
        publishEvents();
    }
}

Přidání kódu pro publikování událostí do centra událostí

Do třídy přidejte metodu Sender s názvem publishEvents :

    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a producer client
        EventHubProducerClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }

Sestavte program a ujistěte se, že nedošlo k žádným chybám. Tento program spustíte po spuštění přijímacího programu.

Příjem událostí

Kód v tomto kurzu je založený na ukázce EventProcessorClient na GitHubu, kterou můžete prozkoumat a zobrazit plně funkční aplikaci.

Upozornění

Pokud tento kód spustíte ve službě Azure Stack Hub, dojde k chybám za běhu, pokud nezaměříte na konkrétní verzi rozhraní API služby Storage. Je to proto, že sada Event Hubs SDK používá nejnovější dostupné rozhraní API služby Azure Storage dostupné v Azure, které nemusí být na vaší platformě Azure Stack Hub dostupné. Azure Stack Hub může podporovat jinou verzi sady Azure Blob Storage SDK, než která je obvykle dostupná v Azure. Pokud jako úložiště kontrolních bodů používáte Azure Blob Storage, zkontrolujte podporovanou verzi rozhraní API služby Azure Storage pro sestavení služby Azure Stack Hub a cílovou verzi v kódu.

Pokud například používáte Azure Stack Hub verze 2005, je nejvyšší dostupná verze pro službu Storage verze 2019-02-02. Ve výchozím nastavení používá klientská knihovna sady Event Hubs SDK nejvyšší dostupnou verzi v Azure (7. 7. 2019 v době vydání sady SDK). V takovém případě budete muset kromě postupu v této části přidat také kód, který bude cílit na rozhraní API služby Storage verze 2019-02-02-02. Příklad cílení na konkrétní verzi rozhraní API služby Storage najdete v této ukázce na GitHubu.

Vytvoření služby Azure Storage a kontejneru objektů blob

V tomto rychlém startu použijete jako úložiště kontrolních bodů Azure Storage (konkrétně Blob Storage). Vytváření kontrolních bodů je proces, kterým procesor událostí označí nebo potvrdí pozici poslední úspěšně zpracované události v rámci oddílu. Označení kontrolního bodu se obvykle provádí v rámci funkce, která zpracovává události. Další informace o vytváření kontrolních bodů najdete v tématu Procesor událostí.

Podle těchto kroků vytvořte účet Azure Storage.

  1. Vytvoření účtu Azure Storage

  2. Vytvoření kontejneru objektů blob

  3. Získání připojovacího řetězce k účtu úložiště

    Poznamenejte si připojovací řetězec a název kontejneru. Použijete je v příjmovém kódu.

Přidání knihoven Event Hubs do projektu Java

Do souboru pom.xml přidejte následující závislosti.

<dependencies>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs</artifactId>
        <version>5.7.0</version>
    </dependency>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
        <version>1.6.0</version>
    </dependency>
</dependencies>
  1. Na začátek souboru Java přidejte následující příkazy importu .

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
  2. Vytvořte třídu s názvem Receivera přidejte do třídy následující proměnné řetězce. Zástupné symboly nahraďte správnými hodnotami.

    Důležité

    Zástupné symboly nahraďte správnými hodnotami.

    • <Event Hubs namespace connection string> pomocí připojovacího řetězce k vašemu oboru názvů služby Event Hubs. Aktualizace
    • <Event hub name> s názvem vašeho centra událostí v oboru názvů.
    • <Storage connection string> pomocí připojovacího řetězce k vašemu účtu úložiště Azure.
    • <Storage container name> názvem kontejneru ve službě Azure Blob Storage.
    private static final String connectionString = "<Event Hubs namespace connection string>";
    private static final String eventHubName = "<Event hub name>";
    private static final String storageConnectionString = "<Storage connection string>";
    private static final String storageContainerName = "<Storage container name>";
    
  3. Do třídy přidejte následující main metodu.

    public static void main(String[] args) throws Exception {
        // Create a blob container client that you use later to build an event processor client to receive and process events
        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(storageConnectionString)
            .containerName(storageContainerName)
            .buildAsyncClient();
    
        // Create a builder object that you will use later to build an event processor client to receive and process events and errors.
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
            .connectionString(connectionString, eventHubName)
            .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
            .processEvent(PARTITION_PROCESSOR)
            .processError(ERROR_HANDLER)
            .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
    
        // Use the builder object to create an event processor client
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
    
        System.out.println("Starting event processor");
        eventProcessorClient.start();
    
        System.out.println("Press enter to stop.");
        System.in.read();
    
        System.out.println("Stopping event processor");
        eventProcessorClient.stop();
        System.out.println("Event processor stopped.");
    
        System.out.println("Exiting process");
    }
    
  4. Přidejte do Receiver třídy dvě pomocné metody (PARTITION_PROCESSOR a ERROR_HANDLER), které zpracovávají události a chyby.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  5. Sestavte program a ujistěte se, že nedošlo k žádným chybám.

Spuštění aplikací

  1. Nejprve spusťte aplikaci Receiver .

  2. Pak spusťte aplikaci Sender .

  3. V okně Aplikace příjemce ověřte, že se zobrazují události publikované aplikací Odesílatel.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Stisknutím klávesy ENTER v okně aplikace příjemce aplikaci zastavte.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Další kroky

Projděte si následující ukázky na GitHubu: