Oktatóanyag: Az Apache Kafka for Event Hubs-események feldolgozása Stream Analytics használatával

Ez a cikk bemutatja, hogyan streamelhet adatokat az Event Hubsba, és hogyan dolgozhatja fel azokat az Azure Stream Analytics használatával. Végigvezeti a következő lépéseken:

  1. Event Hubs-névtér létrehozása.
  2. Hozzon létre egy Kafka-ügyfelet, amely üzeneteket küld az eseményközpontnak.
  3. Hozzon létre egy Stream Analytics-feladatot, amely adatokat másol az eseményközpontból egy Azure Blob Storage-ba.

Nem kell módosítania a protokollügyfeleket, és nem kell saját fürtöket futtatnia az eseményközpont által közzétett Kafka-végpont használatakor. Azure Event Hubs támogatja az Apache Kafka 1.0-s és újabb verzióját.

Előfeltételek

A rövid útmutató elvégzéséhez győződjön meg arról, hogy teljesülnek az alábbi előfeltételek:

Event Hubs-névtér létrehozása

Event Hubs-névtér létrehozásakor a névtér Kafka-végpontja automatikusan engedélyezve lesz. A Kafka protokollt használó alkalmazások eseményeit eseményközpontokba streamelheti. Az Event Hubs-névtér létrehozásához kövesse az Eseményközpont létrehozása Azure Portal használatával című cikk részletes utasításait. Ha dedikált fürtöt használ, tekintse meg a Névtér és eseményközpont létrehozása dedikált fürtön című témakört.

Megjegyzés

A Kafkához készült Event Hubs nem támogatott az alapszinten .

Üzenetek küldése a Kafkával az Event Hubsban

  1. Klónozza a Kafka-adattár Azure Event Hubs a gépére.

  2. Lépjen a következő mappába: azure-event-hubs-for-kafka/quickstart/java/producer.

  3. Frissítse az előállító konfigurációs adatait a fájlban src/main/resources/producer.config. Adja meg az eseményközpont névteréneknevét és kapcsolati sztring.

    bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
    
  4. Nyissa meg a fájlt, azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/és nyissa meg a TestDataReporter.java fájlt egy tetszőleges szerkesztőben.

  5. Tegye megjegyzésbe a következő kódsort:

                //final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
    
  6. Adja hozzá a következő kódsort a megjegyzésbe írt kód helyett:

                final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");            
    

    Ez a kód JSON formátumban küldi el az eseményadatokat. Amikor bemenetet konfigurál egy Stream Analytics-feladathoz, a JSON-t kell megadnia a bemeneti adatok formátumaként.

  7. Futtassa az előállítót , és streamelje az Event Hubsba. Windows rendszerű gépen, Node.js parancssor használatakor váltson a mappára a azure-event-hubs-for-kafka/quickstart/java/producer parancsok futtatása előtt.

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    

Ellenőrizze, hogy az eseményközpont megkapja-e az adatokat

  1. Az ENTITÁSOK területen válassza az Event Hubs lehetőséget. Ellenőrizze, hogy megjelenik-e egy test nevű eseményközpont.

    Eseményközpont – teszt

  2. Győződjön meg arról, hogy az eseményközpontba érkező üzeneteket látja.

    Eseményközpont – üzenetek

Eseményadatok feldolgozása Stream Analytics-feladattal

Ebben a szakaszban egy Azure Stream Analytics-feladatot hoz létre. A Kafka-ügyfél eseményeket küld az eseményközpontba. Létrehozhat egy Stream Analytics-feladatot, amely bemenetként veszi az eseményadatokat, és egy Azure Blob Storage-ba nyitja meg azokat. Ha nem rendelkezik Azure Storage-fiókkal, hozzon létre egyet.

A Stream Analytics-feladat lekérdezése elemzés nélkül továbbítja az adatokat. Létrehozhat egy lekérdezést, amely átalakítja a bemeneti adatokat, hogy a kimeneti adatokat más formátumban vagy a megszerzett megállapításokkal hozza létre.

Stream Analytics-feladat létrehozása

  1. Válassza a + Erőforrás létrehozása lehetőséget a Azure Portal.
  2. A Azure Marketplace menüben válassza az Elemzés, majd a Stream Analytics-feladat lehetőséget.
  3. Az Új Stream Analytics lapon hajtsa végre a következő műveleteket:
    1. Adja meg a feladat nevét .

    2. Válassza ki előfizetését.

    3. Válassza az Új létrehozása lehetőséget az erőforráscsoporthoz , és adja meg a nevet. Meglévő erőforráscsoportot is használhat .

    4. Válassza ki a feladat helyét .

    5. A feladat létrehozásához válassza a Létrehozás lehetőséget.

      Új Stream Analytics-feladat

Feladatbemenet konfigurálása

  1. Az értesítési üzenetben válassza az Erőforrás megnyitása lehetőséget a Stream Analytics-feladat oldalának megtekintéséhez.

  2. A bal oldali menü FELADATTOPOLÓGIA szakaszában válassza a Bemenetek lehetőséget.

  3. Válassza a Streambemenet hozzáadása, majd az Event Hub lehetőséget.

    Eseményközpont hozzáadása bemenetként

  4. Az Event Hub bemeneti konfigurációs oldalán hajtsa végre a következő műveleteket:

    1. Adja meg a bemenet aliasát .

    2. Válassza ki az Azure-előfizetését.

    3. Válassza ki a korábban létrehozott eseményközpont-névteret .

    4. Válassza ki az eseményközponttesztelését.

    5. Válassza a Mentés lehetőséget.

      Eseményközpont bemeneti konfigurációja

Feladatkimenet konfigurálása

  1. A menü FELADATTOPOLÓGIA szakaszában válassza a Kimenetek lehetőséget.
  2. Válassza a + Hozzáadás lehetőséget az eszköztáron, majd válassza a Blob Storage lehetőséget
  3. A Blob Storage kimeneti beállításai lapon hajtsa végre a következő műveleteket:
    1. Adja meg a kimenet aliasát .

    2. Válassza ki az Azure-előfizetését.

    3. Válassza ki az Azure Storage-fiókját.

    4. Adja meg annak a tárolónak a nevét , amely a Stream Analytics-lekérdezésből származó kimeneti adatokat tárolja.

    5. Válassza a Mentés lehetőséget.

      Blob Storage kimeneti konfigurációja

Lekérdezés definiálása

Miután sikeresen beállította a Stream Analytics-feladatot a beérkező adatfolyam olvasására, a következő lépés egy átalakítás létrehozása, amely valós időben elemzi az adatokat. Az átalakítási lekérdezés definiálásához használja a Stream Analytics lekérdezési nyelvet. Ebben az útmutatóban egy olyan lekérdezést határoz meg, amely átalakítás nélkül halad át az adatokon.

  1. Válassza a Lekérdezés lehetőséget.

  2. A lekérdezési ablakban cserélje le a elemet [YourOutputAlias] a korábban létrehozott kimeneti aliasra.

  3. Cserélje le a elemet [YourInputAlias] a korábban létrehozott bemeneti aliasra.

  4. Válassza az eszköztár Save (Mentés) elemét.

    A képernyőfelvétel a lekérdezési ablakot jeleníti meg a bemeneti és kimeneti változók értékeivel.

Stream Analytics-feladat futtatása

  1. Válassza a bal oldali menü Áttekintés elemét.

  2. Válassza az Indítás elemet.

    Start menü

  3. A Feladat indítása lapon válassza a Start gombot.

    Feladat indítása lap

  4. Várja meg, amíg a feladat állapota Indításrólfutásra változik.

    Feladat állapota – fut

A forgatókönyv tesztelése

  1. Futtassa újra a Kafka-előállítót , hogy eseményeket küldjön az eseményközpontba.

    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    
  2. Győződjön meg arról, hogy a kimeneti adatok az Azure Blob Storage-ban jönnek létre. A tárolóban egy 100 sort tartalmazó JSON-fájl jelenik meg, amely az alábbi mintasorhoz hasonlóan néz ki:

    {"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    

    Az Azure Stream Analytics-feladat bemeneti adatokat kapott az eseményközponttól, és ebben a forgatókönyvben az Azure Blob Storage-ban tárolta azokat.

Következő lépések

Ebben a cikkben megtanulta, hogyan streamelhet az Event Hubsba a protokollügyfelek módosítása vagy saját fürtök futtatása nélkül. További információ az Apache Kafkához készült Event Hubsról: Apache Kafka fejlesztői útmutató Azure Event Hubs.