Kurz: Zpracování událostí Apache Kafka pro event Hubs pomocí Stream Analytics

Tento článek ukazuje, jak streamovat data do služby Event Hubs a zpracovávat je pomocí Azure Stream Analytics. Provede vás následujícími kroky:

  1. Vytvořte obor názvů služby Event Hubs.
  2. Vytvořte klienta Kafka, který odesílá zprávy do centra událostí.
  3. Vytvořte úlohu Stream Analytics, která kopíruje data z centra událostí do úložiště objektů blob v Azure.

Pokud používáte koncový bod Kafka vystavený centrem událostí, nemusíte měnit klienty protokolu ani spouštět vlastní clustery. Azure Event Hubs podporuje Apache Kafka verze 1.0 a vyšší.

Požadavky

Abyste mohli absolvovat tento rychlý start, ujistěte se, že máte následující:

Vytvoření oboru názvů služby Event Hubs

Při vytváření oboru názvů služby Event Hubs se koncový bod Kafka pro obor názvů automaticky povolí. Události z aplikací, které používají protokol Kafka, můžete streamovat do služby Event Hubs. Podle podrobných pokynů v tématu Vytvoření centra událostí pomocí Azure Portal vytvořte obor názvů služby Event Hubs. Pokud používáte vyhrazený cluster, přečtěte si téma Vytvoření oboru názvů a centra událostí ve vyhrazeném clusteru.

Poznámka

Služba Event Hubs pro Kafka se na úrovni Basic nepodporuje.

Odesílání zpráv pomocí Kafka ve službě Event Hubs

  1. Naklonujte úložiště Azure Event Hubs pro Kafka do počítače.

  2. Přejděte do složky: azure-event-hubs-for-kafka/quickstart/java/producer.

  3. Aktualizujte podrobnosti o konfiguraci producenta v src/main/resources/producer.confignástroji . Zadejte název a připojovací řetězec pro obor názvů centra událostí.

    bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
    
  4. Přejděte na azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/a otevřete soubor TestDataReporter.java v editoru podle svého výběru.

  5. Zakomentujte následující řádek kódu:

                //final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
    
  6. Místo zakomentovaného kódu přidejte následující řádek kódu:

                final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");            
    

    Tento kód odešle data události ve formátu JSON . Při konfiguraci vstupu pro úlohu Stream Analytics zadáte jako formát vstupních dat JSON.

  7. Spusťte producenta a streamujte do služby Event Hubs. Pokud na počítači s Windows používáte příkazový řádekNode.js, přepněte před spuštěním azure-event-hubs-for-kafka/quickstart/java/producer těchto příkazů do složky.

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    

Ověření, že centrum událostí přijímá data

  1. V části ENTITY vyberte Event Hubs. Ověřte, že se zobrazuje centrum událostí s názvem test.

    Centrum událostí – test

  2. Ověřte, že se do centra událostí zobrazují zprávy.

    Centrum událostí – zprávy

Zpracování dat událostí pomocí úlohy Stream Analytics

V této části vytvoříte úlohu Azure Stream Analytics. Klient Kafka odesílá události do centra událostí. Vytvoříte úlohu Stream Analytics, která přijímá data událostí jako vstup a vypíše je do úložiště objektů blob v Azure. Pokud účet Azure Storage nemáte, vytvořte si ho.

Dotaz v úloze Stream Analytics prochází data bez provádění analýz. Můžete vytvořit dotaz, který transformuje vstupní data a vytvoří výstupní data v jiném formátu nebo se získanými přehledy.

Vytvoření úlohy Stream Analytics

  1. V Azure Portal vyberte + Vytvořit prostředek.
  2. V nabídce Azure Marketplace vyberte Analytics a pak úloha Stream Analytics.
  3. Na stránce Nový Stream Analytics proveďte následující akce:
    1. Zadejte název úlohy.

    2. Vyberte své předplatné.

    3. Jako skupinu prostředků vyberte Vytvořit novou a zadejte název. Můžete také použít existující skupinu prostředků.

    4. Vyberte umístění pro úlohu.

    5. Vyberte Vytvořit a vytvořte úlohu.

      Nová úloha Stream Analytics

Konfigurace vstupu úlohy

  1. Ve zprávě s oznámením vyberte Přejít k prostředku , aby se zobrazila stránka úlohy Stream Analytics .

  2. V části TOPOLOGIE ÚLOHY v nabídce vlevo vyberte Vstupy.

  3. Vyberte Přidat vstup streamu a pak vyberte Centrum událostí.

    Přidání centra událostí jako vstupu

  4. Na stránce Konfigurace vstupu centra událostí proveďte následující akce:

    1. Zadejte alias pro vstup.

    2. Vyberte své předplatné Azure.

    3. Vyberte obor názvů centra událostí , který jste vytvořili dříve.

    4. Vyberte test pro centrum událostí.

    5. Vyberte Uložit.

      Konfigurace vstupu centra událostí

Konfigurace výstupu úlohy

  1. V části TOPOLOGIE ÚLOHY v nabídce vyberte Výstupy.
  2. Na panelu nástrojů vyberte + Přidat a pak vyberte Úložiště objektů blob.
  3. Na stránce Nastavení výstupu služby Blob Storage proveďte následující akce:
    1. Zadejte alias výstupu.

    2. Vyberte své předplatné Azure.

    3. Vyberte svůj účet Služby Azure Storage.

    4. Zadejte název kontejneru , který ukládá výstupní data z dotazu Stream Analytics.

    5. Vyberte Uložit.

      Konfigurace výstupu služby Blob Storage

Definování dotazu

Po dokončení nastavení úlohy Stream Analytics pro čtení příchozího datového streamu je dalším krokem vytvoření transformace, která analyzuje data v reálném čase. Transformační dotaz definujete pomocí jazyka Stream Analytics Query Language. V tomto návodu definujete dotaz, který prochází daty bez provedení transformace.

  1. Vyberte Dotaz.

  2. V okně dotazu nahraďte [YourOutputAlias] výstupním aliasem, který jste vytvořili dříve.

  3. Nahraďte [YourInputAlias] vstupním aliasem, který jste vytvořili dříve.

  4. Na panelu nástrojů vyberte Uložit.

    Snímek obrazovky ukazuje okno dotazu s hodnotami vstupních a výstupních proměnných.

Spuštění úlohy Stream Analytics

  1. V nabídce vlevo vyberte Přehled .

  2. Vyberte Spustit.

    Nabídka Start

  3. Na stránce Spustit úlohu vyberte Spustit.

    Stránka Spustit úlohu

  4. Počkejte, až se stav úlohy změní ze Spuštěno na Spuštěno.

    Stav úlohy – spuštěno

Otestovat scénář

  1. Znovu spusťte producenta Kafka , aby se události odesílaly do centra událostí.

    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    
  2. Ověřte, že se ve službě Azure Blob Storage vygenerují výstupní data. V kontejneru se zobrazí soubor JSON se 100 řádky, které vypadají jako následující ukázkové řádky:

    {"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    

    Úloha Azure Stream Analytics v tomto scénáři přijala vstupní data z centra událostí a uložila je do úložiště objektů blob v Azure.

Další kroky

V tomto článku jste zjistili, jak streamovat do služby Event Hubs beze změny klientů protokolu nebo spouštění vlastních clusterů. Další informace o službě Event Hubs pro Apache Kafka najdete v příručce pro vývojáře Apache Kafka pro Azure Event Hubs.