Zelfstudie: Apache Kafka voor Event Hubs-gebeurtenissen verwerken met behulp van Stream Analytics

In dit artikel wordt beschreven hoe u gegevens streamt naar Event Hubs en hoe u deze verwerkt met Azure Stream Analytics. U wordt door de volgende stappen geleid:

  1. Maak een Event Hubs-naamruimte.
  2. Maak een Kafka-client die berichten naar de Event Hub verzendt.
  3. Maak een Stream Analytics-taak waarmee gegevens van de Event Hub naar een Azure Blob-opslag worden gekopieerd.

U hoeft uw protocolclients niet te wijzigen of uw eigen clusters uit te voeren wanneer u het Kafka-eindpunt gebruikt dat door een Event Hub wordt weergegeven. Azure Event Hubs ondersteunt Apache Kafka versie 1.0 en hoger.

Vereisten

Zorg ervoor dat u aan de volgende vereisten voldoet om deze snelstart uit te voeren:

Een Event Hubs-naamruimte maken

Wanneer u een Event Hubs-naamruimte maakt, wordt het Kafka-eindpunt voor de naamruimte automatisch ingeschakeld. U kunt gebeurtenissen streamen vanuit uw toepassingen die gebruikmaken van het Kafka-protocol naar Event Hubs. Volg de stapsgewijze instructies in Een Event Hub maken met behulp van Azure Portal om een Event Hubs-naamruimte te maken. Als u een toegewezen cluster gebruikt, raadpleegt u Een naamruimte en Event Hub maken in een toegewezen cluster.

Notitie

Event Hubs voor Kafka wordt niet ondersteund in de Basic-laag .

Berichten verzenden met Kafka in Event Hubs

  1. Kloon de Azure Event Hubs for Kafka-opslagplaats naar uw machine.

  2. Navigeer naar de map azure-event-hubs-for-kafka/quickstart/java/producer.

  3. Werk de configuratiegegevens voor de producer bij in src/main/resources/producer.config. Geef de naam en verbindingsreeks op voor de Event Hub-naamruimte.

    bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
    
  4. Navigeer naar azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/ en open het bestand TestDataReporter.java in een editor naar keuze.

  5. Voeg commentaar toe aan de volgende regel code:

                //final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
    
  6. Voeg de volgende regel code toe in plaats van de code met commentaar:

                final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");            
    

    Deze code verzendt de gebeurtenisgegevens in JSON-indeling. Wanneer u invoer voor een Stream Analytics-taak configureert, geeft u JSON als de indeling voor de invoergegevens op.

  7. Voer de producer uit en stream naar Event Hubs. Schakel op een Windows-computer, wanneer u gebruikmaakt van een Node.js-opdrachtprompt, over naar de map azure-event-hubs-for-kafka/quickstart/java/producer voordat u deze opdrachten uitvoert.

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    

Controleren of Event Hub de gegevens ontvangt

  1. Selecteer Event Hubs onder ENTITEITEN. Controleer of er een Event Hub met de naam test wordt weergegeven.

    Event hub - test

  2. Controleer of er berichten worden weergegeven in de Event Hub.

    Event Hub - berichten

Gebeurtenisgegevens verwerken met een Stream Analytics-taak

In dit gedeelte maakt u een Azure Stream Analytics-taak. De Kafka-client verzendt gebeurtenissen naar de Event Hub. U maakt een Stream Analytics-taak waarmee gebeurtenisgegevens worden ingevoerd als invoer en naar een Azure Blob-opslag worden uitgevoerd. Als u geen Azure Storage-account hebt, maakt u er een.

De query in de Stream Analytics-taak geeft de gegevens door zonder analyses uit te voeren. U kunt een query maken die de invoergegevens transformeert om uitvoergegevens te produceren in een andere indeling of met verworven inzichten.

Een Stream Analytics-taak maken

  1. Selecteer Een resource maken in de Azure-portal.
  2. Selecteer Analytics in het menu Azure Marketplace en selecteer Stream Analytics-taak.
  3. Voer op de pagina Nieuwe Stream Analytics de volgende acties uit:
    1. Voer een naam voor de taak in.

    2. Selecteer uw abonnement.

    3. Selecteer Nieuwe maken voor de resourcegroep en voer de naam in. U kunt ook een bestaande resourcegroep gebruiken.

    4. Selecteer een locatie voor de taak.

    5. Selecteer Maken om de taak te maken.

      Nieuwe Stream Analytics-taak

Taakinvoer configureren

  1. Selecteer in het meldingsbericht Ga naar resource om de pagina Stream Analytics-taak te bekijken.

  2. Selecteer Invoer in het gedeelte TAAKTOPOLOGIE in het menu aan de linkerkant.

  3. Selecteer Stroominvoer toevoegen en selecteer vervolgens Event Hub.

    Event Hub toevoegen als invoer

  4. Voer de volgende handelingen uit op de configuratiepagina Event Hub-invoer:

    1. Geef een alias op voor de invoer.

    2. Selecteer uw Azure-abonnement.

    3. Selecteer de Event Hub-naamruimte die u eerder hebt gemaakt.

    4. Selecteer test voor de Event Hub.

    5. Selecteer Opslaan.

      Configuratie van Event Hub-invoer

Taakuitvoer configureren

  1. Selecteer Uitvoer in het gedeelte TAAKTOPOLOGIE in het menu.
  2. Selecteer + Toevoegen op de werkbalk en selecteer Blob-opslag
  3. Voer de volgende handelingen uit op de pagina met uitvoerinstellingen voor de Blob-opslag:
    1. Geef een alias op voor de uitvoer.

    2. Selecteer uw Azure-abonnement.

    3. Selecteer uw Azure Storage-account.

    4. Voer een naam in voor de container waarin de uitvoergegevens van de Stream Analytics-query worden opgeslagen.

    5. Selecteer Opslaan.

      Configuratie van Blob Storage-uitvoer

Een query definiƫren

Nadat u een Stream Analytics-taak zodanig hebt ingesteld dat een binnenkomende gegevensstroom kan worden gelezen, is de volgende stap het maken van een transformatie waarmee gegevens in real-time worden geanalyseerd. U definieert de transformatie-query met Stream Analytics Query Language. In dit overzicht definieert u een query die de gegevens doorgeeft zonder transformatie uit te voeren.

  1. Selecteer Query.

  2. Vervang in het queryvenster [YourOutputAlias] door de uitvoeralias die u eerder hebt gemaakt.

  3. Vervang [YourInputAlias] door de invoeralias die u eerder hebt gemaakt.

  4. Selecteer Opslaan op de werkbalk.

    Schermopname van het queryvenster met waarden voor invoer- en uitvoervariabelen.

De Stream Analytics-taak uitvoeren

  1. Selecteer Overzicht in het menu aan de linkerkant.

  2. Selecteer Starten.

    Startmenu

  3. Selecteer op de pagina Taak starten de optie Starten.

    De pagina Taak starten

  4. Wacht tot de status van de taak is gewijzigd van Wordt gestart in Wordt uitgevoerd.

    Taakstatus - wordt uitgevoerd

Het scenario testen

  1. Voer de Kafka-producer opnieuw uit om gebeurtenissen naar de Event Hub te verzenden.

    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    
  2. Controleer of uitvoergegevens zijn gegenereerd in de Azure Blob-opslag. U ziet een JSON-bestand in de container met 100 rijen die er uitzien als de volgende voorbeeldrijen:

    {"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    

    In dit scenario heeft de Azure Stream Analytics-taak invoergegevens ontvangen van de Event Hub en opgeslagen in de Azure Blob-opslag.

Volgende stappen

In dit artikel hebt u geleerd u hoe u kunt streamen naar Event Hubs zonder uw protocolclients te wijzigen of uw eigen clusters uit te voeren. Zie Ontwikkelaarshandleiding van Apache Kafka voor Azure Event Hubs voor meer informatie over Event Hubs voor Apache Kafka.