Oktatóanyag: Az Apache Kafka for Event Hubs-események feldolgozása Stream Analytics használatával
Ez a cikk bemutatja, hogyan streamelhet adatokat az Event Hubsba, és hogyan dolgozhatja fel azokat az Azure Stream Analytics használatával. Végigvezeti a következő lépéseken:
- Event Hubs-névtér létrehozása.
- Hozzon létre egy Kafka-ügyfelet, amely üzeneteket küld az eseményközpontnak.
- Hozzon létre egy Stream Analytics-feladatot, amely adatokat másol az eseményközpontból egy Azure Blob Storage-ba.
Nem kell módosítania a protokollügyfeleket, és nem kell saját fürtöket futtatnia az eseményközpont által közzétett Kafka-végpont használatakor. Azure Event Hubs támogatja az Apache Kafka 1.0-s és újabb verzióját.
Előfeltételek
A rövid útmutató elvégzéséhez győződjön meg arról, hogy teljesülnek az alábbi előfeltételek:
- Azure-előfizetés. Ha még nincs előfizetése, hozzon létre egy ingyenes fiókot, mielőtt hozzákezd.
- Java fejlesztői készlet (JDK) 1.7+.
- Töltsön le és telepítsen egy Maven bináris archívumot.
- Git
- Egy Azure Storage-fiók. Ha még nem rendelkezik ilyen eszközzel, hozzon létre egyet a továbblépés előtt. Az útmutatóban szereplő Stream Analytics-feladat egy Azure Blob Storage-tárolóban tárolja a kimeneti adatokat.
Event Hubs-névtér létrehozása
Event Hubs-névtér létrehozásakor a névtér Kafka-végpontja automatikusan engedélyezve lesz. A Kafka protokollt használó alkalmazások eseményeit eseményközpontokba streamelheti. Az Event Hubs-névtér létrehozásához kövesse az Eseményközpont létrehozása Azure Portal használatával című cikk részletes utasításait. Ha dedikált fürtöt használ, tekintse meg a Névtér és eseményközpont létrehozása dedikált fürtön című témakört.
Megjegyzés
A Kafkához készült Event Hubs nem támogatott az alapszinten .
Üzenetek küldése a Kafkával az Event Hubsban
Klónozza a Kafka-adattár Azure Event Hubs a gépére.
Lépjen a következő mappába:
azure-event-hubs-for-kafka/quickstart/java/producer
.Frissítse az előállító konfigurációs adatait a fájlban
src/main/resources/producer.config
. Adja meg az eseményközpont névteréneknevét és kapcsolati sztring.bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
Nyissa meg a fájlt,
azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/
és nyissa meg a TestDataReporter.java fájlt egy tetszőleges szerkesztőben.Tegye megjegyzésbe a következő kódsort:
//final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
Adja hozzá a következő kódsort a megjegyzésbe írt kód helyett:
final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");
Ez a kód JSON formátumban küldi el az eseményadatokat. Amikor bemenetet konfigurál egy Stream Analytics-feladathoz, a JSON-t kell megadnia a bemeneti adatok formátumaként.
Futtassa az előállítót , és streamelje az Event Hubsba. Windows rendszerű gépen, Node.js parancssor használatakor váltson a mappára a
azure-event-hubs-for-kafka/quickstart/java/producer
parancsok futtatása előtt.mvn clean package mvn exec:java -Dexec.mainClass="TestProducer"
Ellenőrizze, hogy az eseményközpont megkapja-e az adatokat
Az ENTITÁSOK területen válassza az Event Hubs lehetőséget. Ellenőrizze, hogy megjelenik-e egy test nevű eseményközpont.
Győződjön meg arról, hogy az eseményközpontba érkező üzeneteket látja.
Eseményadatok feldolgozása Stream Analytics-feladattal
Ebben a szakaszban egy Azure Stream Analytics-feladatot hoz létre. A Kafka-ügyfél eseményeket küld az eseményközpontba. Létrehozhat egy Stream Analytics-feladatot, amely bemenetként veszi az eseményadatokat, és egy Azure Blob Storage-ba nyitja meg azokat. Ha nem rendelkezik Azure Storage-fiókkal, hozzon létre egyet.
A Stream Analytics-feladat lekérdezése elemzés nélkül továbbítja az adatokat. Létrehozhat egy lekérdezést, amely átalakítja a bemeneti adatokat, hogy a kimeneti adatokat más formátumban vagy a megszerzett megállapításokkal hozza létre.
Stream Analytics-feladat létrehozása
- Válassza a + Erőforrás létrehozása lehetőséget a Azure Portal.
- A Azure Marketplace menüben válassza az Elemzés, majd a Stream Analytics-feladat lehetőséget.
- Az Új Stream Analytics lapon hajtsa végre a következő műveleteket:
Adja meg a feladat nevét .
Válassza ki előfizetését.
Válassza az Új létrehozása lehetőséget az erőforráscsoporthoz , és adja meg a nevet. Meglévő erőforráscsoportot is használhat .
Válassza ki a feladat helyét .
A feladat létrehozásához válassza a Létrehozás lehetőséget.
Feladatbemenet konfigurálása
Az értesítési üzenetben válassza az Erőforrás megnyitása lehetőséget a Stream Analytics-feladat oldalának megtekintéséhez.
A bal oldali menü FELADATTOPOLÓGIA szakaszában válassza a Bemenetek lehetőséget.
Válassza a Streambemenet hozzáadása, majd az Event Hub lehetőséget.
Az Event Hub bemeneti konfigurációs oldalán hajtsa végre a következő műveleteket:
Adja meg a bemenet aliasát .
Válassza ki az Azure-előfizetését.
Válassza ki a korábban létrehozott eseményközpont-névteret .
Válassza ki az eseményközponttesztelését.
Válassza a Mentés lehetőséget.
Feladatkimenet konfigurálása
- A menü FELADATTOPOLÓGIA szakaszában válassza a Kimenetek lehetőséget.
- Válassza a + Hozzáadás lehetőséget az eszköztáron, majd válassza a Blob Storage lehetőséget
- A Blob Storage kimeneti beállításai lapon hajtsa végre a következő műveleteket:
Adja meg a kimenet aliasát .
Válassza ki az Azure-előfizetését.
Válassza ki az Azure Storage-fiókját.
Adja meg annak a tárolónak a nevét , amely a Stream Analytics-lekérdezésből származó kimeneti adatokat tárolja.
Válassza a Mentés lehetőséget.
Lekérdezés definiálása
Miután sikeresen beállította a Stream Analytics-feladatot a beérkező adatfolyam olvasására, a következő lépés egy átalakítás létrehozása, amely valós időben elemzi az adatokat. Az átalakítási lekérdezés definiálásához használja a Stream Analytics lekérdezési nyelvet. Ebben az útmutatóban egy olyan lekérdezést határoz meg, amely átalakítás nélkül halad át az adatokon.
Válassza a Lekérdezés lehetőséget.
A lekérdezési ablakban cserélje le a elemet
[YourOutputAlias]
a korábban létrehozott kimeneti aliasra.Cserélje le a elemet
[YourInputAlias]
a korábban létrehozott bemeneti aliasra.Válassza az eszköztár Save (Mentés) elemét.
Stream Analytics-feladat futtatása
Válassza a bal oldali menü Áttekintés elemét.
Válassza az Indítás elemet.
A Feladat indítása lapon válassza a Start gombot.
Várja meg, amíg a feladat állapota Indításrólfutásra változik.
A forgatókönyv tesztelése
Futtassa újra a Kafka-előállítót , hogy eseményeket küldjön az eseményközpontba.
mvn exec:java -Dexec.mainClass="TestProducer"
Győződjön meg arról, hogy a kimeneti adatok az Azure Blob Storage-ban jönnek létre. A tárolóban egy 100 sort tartalmazó JSON-fájl jelenik meg, amely az alábbi mintasorhoz hasonlóan néz ki:
{"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
Az Azure Stream Analytics-feladat bemeneti adatokat kapott az eseményközponttól, és ebben a forgatókönyvben az Azure Blob Storage-ban tárolta azokat.
Következő lépések
Ebben a cikkben megtanulta, hogyan streamelhet az Event Hubsba a protokollügyfelek módosítása vagy saját fürtök futtatása nélkül. További információ az Apache Kafkához készült Event Hubsról: Apache Kafka fejlesztői útmutató Azure Event Hubs.