Oktatóanyag: Az Apache Kafka Producer és Consumer API-k használata
Ismerkedjen meg az Apache Kafka Producer és Consumer API-k a Kafka on HDInsighttal való használatával.
A Kafka Producer API segítségével az alkalmazások adatstreameket küldhetnek a Kafka-fürtnek. A Kafka Consumer API segítségével az alkalmazások adatstreameket olvashatnak be a fürtből.
Eben az oktatóanyagban az alábbiakkal fog megismerkedni:
- Előfeltételek
- A kód értelmezése
- Az alkalmazás létrehozása és üzembe helyezése
- Az alkalmazás futtatása a fürtön
Az API-król további információkat az Apache dokumentációjának Producer API-val és Consumer API-val foglalkozó részeiben talál.
Előfeltételek
- Apache Kafka on HDInsight-fürt. A fürt létrehozásának módjáról a Start with Apache Kafka on HDInsight (Első lépések az Apache Kafka on HDInsight használatával) című témakörben olvashat.
- A Java Developer Kit (JDK) 8-as vagy azzal egyenértékű verziója, például az OpenJDK.
- Az Apache Maven megfelelően van telepítve az Apache szerint. A Maven egy Java-projektekhez készült projektépítési rendszer.
- Egy olyan SSH-ügyfél, mint Putty. További információért lásd: Csatlakozás a HDInsighthoz (Apache Hadoop) SSH-val.
A kód értelmezése
A példaalkalmazás helye: https://github.com/Azure-Samples/hdinsight-kafka-java-get-started (Producer-Consumer
alkönyvtár). Ha az Enterprise Security Package (ESP) kompatibilis Kafka-fürtöt használja, az alkönyvtárban DomainJoined-Producer-Consumer
található alkalmazásverziót kell használnia.
Az alkalmazás elsődlegesen négy fájlból áll:
-
pom.xml
: Ez a fájl határozza meg a projektfüggőségeket, a Java-verziót és a csomagolási módszereket. -
Producer.java
: Ez a fájl véletlenszerű mondatokat küld a Kafkának a Producer API használatával. -
Consumer.java
: Ez a fájl a Consumer API-t használatával olvas ki adatokat a Kafkából, és az STDOUT-ba küldi el azokat. -
AdminClientWrapper.java
: Ez a fájl a rendszergazdai API-t használja a Kafka-témakörök létrehozásához, leírásához és törléséhez. -
Run.java
: A Producer- és Consumer-kód futtatásához a rendszer a parancssori felületet használja.
Pom.xml
A pom.xml
fájl fontosabb elemei a következők:
Függőségek: Ez a projekt a Kafka Producer és Consumer API-jaira támaszkodik, amelyeket a
kafka-clients
csomag tartalmaz. Ezt a függőséget a következő XML-kód határozza meg:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
A
${kafka.version}
bejegyzés apom.xml
fájl<properties>..</properties>
szakaszában van meghatározva, és a HDInsight-fürt Kafka-verziójához van konfigurálva.Beépülő modulok: A Maven beépülő modulok különböző képességekkel rendelkeznek. Ebben a projektben a következő beépülő modulokat használjuk:
-
maven-compiler-plugin
: Ezzel állítható a projekt által használt Java-verzió a 8-as verzióra. A HDInsight 3.6 ezt a Java-verziót használja. -
maven-shade-plugin
: Ez az alkalmazást és az esetleges függőségeket tartalmazó uber JAR-fájl létrehozására használható. Az alkalmazás belépési pontjának beállítására is használható, így közvetlenül futtathatja a JAR-fájlt anélkül, hogy a fő osztályt meg kellene határoznia.
-
Producer.java
A Producer a Kafka közvetítő gazdagépeivel (feldolgozó csomópontjaival) kommunikál, és adatok küld egy Kafka-témakörnek. A következő kódrészlet a GitHub-adattárProducer.java fájljából származik, és bemutatja, hogyan állíthatja be az előállító tulajdonságait. Az Enterprise Security Enabled fürtök esetében egy további tulajdonságot kell hozzáadni a következőhöz: "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
A Consumer a Kafka közvetítő gazdagépeivel (feldolgozó csomópontjaival) kommunikál, majd egy ciklusban beolvassa az adatokat. A Consumer.java fájl következő kódrészlete állítja be a fogyasztó tulajdonságait. Az Enterprise Security Enabled fürtök esetében egy további tulajdonságot kell hozzáadni a következőhöz: "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
Ebben a kódban a Consumer úgy van beállítva, hogy a beolvasást a témakör elejétől kezdje (az auto.offset.reset
beállítása: earliest
).
Run.java
A Run.java fájl egy parancssori felületet biztosít, amely az előállító vagy a fogyasztói kódot futtatja. Paraméterként a Kafka közvetítő gazdagépének adatait kell megadni. Megadhat egy csoportazonosító-értéket is, amelyet a fogyasztói folyamat használ. Ha több fogyasztói példányt hoz létre ugyanazzal a csoportazonosítóval, azok terheléselosztást fognak biztosítani a témakörből származó olvasáshoz.
A példa létrehozása és üzembe helyezése
Előre elkészített JAR-fájlok használata
Töltse le a jars-fájlokat a Kafka Get Started Azure-mintából. Ha a fürt engedélyezve van az Enterprise Security Package (ESP) szolgáltatásban, használja a kafka-producer-consumer-esp.jar fájlt. Az alábbi paranccsal másolja a jars-fájlokat a fürtbe.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
JAR-fájlok létrehozása kódból
Ha ki szeretné hagyni ezt a lépést, az előre összeállított jar-fájlok letölthetők az Prebuilt-Jars
alkönyvtárból. Töltse le a kafka-producer-consumer.jar fájlt. Ha a fürt engedélyezve van az Enterprise Security Package (ESP) szolgáltatásban, használja a kafka-producer-consumer-esp.jar fájlt. A 3. lépés végrehajtásával másolja a jar-t a HDInsight-fürtbe.
Töltse le és bontsa ki a példákat a fájlból https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.
Állítsa be az aktuális könyvtárat a könyvtár helyére
hdinsight-kafka-java-get-started\Producer-Consumer
. Ha az Enterprise Security Package (ESP) kompatibilis Kafka-fürtöt használja, a helyet alkönyvtárraDomainJoined-Producer-Consumer
kell állítania. Az alkalmazás létrehozásához használja a következő parancsot:mvn clean package
A parancs létrehozza a
target
nevű könyvtárat, amely akafka-producer-consumer-1.0-SNAPSHOT.jar
nevű fájlt tartalmazza. ESP-fürtök esetén a fájl a következő lesz:kafka-producer-consumer-esp-1.0-SNAPSHOT.jar
Cserélje le az
sshuser
elemet a fürt SSH-felhasználójára, illetve aCLUSTERNAME
elemet a fürt nevére. Írja be a következő parancsot akafka-producer-consumer-1.0-SNAPSHOT.jar
fájl HDInsight-fürtbe való másolásához. Ha a rendszer kéri, adja meg az SSH-felhasználó jelszavát.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
A példa futtatása
Cserélje le az
sshuser
elemet a fürt SSH-felhasználójára, illetve aCLUSTERNAME
elemet a fürt nevére. Nyisson meg egy SSH-kapcsolatot a fürthöz a következő parancs megadásával. Ha a rendszer kéri, adja meg az SSH-felhasználói fiók jelszavát.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
A Kafka-közvetítő gazdagépek lekéréséhez cserélje le az értékeket a és
<password>
értékre<clustername>
a következő parancsban, és hajtsa végre. Használja ugyanazt a burkolatot,<clustername>
mint a Azure Portal. Cserélje le a elemet<password>
a fürt bejelentkezési jelszavára, majd hajtsa végre a következőt:sudo apt -y install jq export CLUSTER_NAME='<clustername>' export PASSWORD='<password>' export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Megjegyzés
Ehhez a parancshoz Ambari-hozzáférés szükséges. Ha a fürt egy NSG mögött található, futtassa ezt a parancsot egy olyan gépről, amely hozzáfér az Ambarihoz.
Hozza létre a Kafka-témakört a
myTest
következő paranccsal:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
Az előállító futtatásához és az adatok témakörbe írásához használja az alábbi parancsot:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
Az előállító futásának befejeződését követően használja az alábbi parancsot a témakörből történő olvasáshoz:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
A rendszer megjeleníti a beolvasott rekordokat a rekordok számával együtt.
Használja a Ctrl + C billentyűparancsot a fogyasztóból történő kilépéshez.
Több fogyasztó
A Kafka-fogyasztók egy fogyasztói csoportot használnak a rekordok olvasásakor. Ugyanazon csoport használata több fogyasztó esetén a terhelés szempontjából kiegyensúlyozott olvasást eredményez a témakörökből történő olvasáskor. A csoport mindegyik fogyasztója a rekordok egy részét kapja meg.
A fogyasztó alkalmazás elfogadja a csoportazonosítóként használt paramétert. Az alábbi parancs például elindít egy fogyasztót a myGroup
csoportazonosítóval:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Használja a Ctrl + C billentyűparancsot a fogyasztóból történő kilépéshez.
Ha szeretné látni, hogyan zajlik a folyamat, használja az alábbi parancsot:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Ez a parancs a tmux
használatával két oszlopra osztja a terminált. Mindegyik oszlopban elindul egy-egy fogyasztó, amelyekhez azonos csoportazonosító-érték tartozik. Ha a fogyasztók befejezték a beolvasást, figyelje meg, hogy mindegyik csak a rekordok egy részét olvasta be. A Ctrl + C billentyűkombináció kétszeri lenyomásával lépjen ki a alkalmazásból tmux
.
Az ugyanazon csoportban található ügyfelek általi felhasználás kezelése a témakör partícióinak használatával történik. Ebben a kódmintában a korábban létrehozott test
témakör nyolc partícióval rendelkezik. Ha elindít nyolc fogyasztót, az egyes fogyasztók a témakör egyetlen partíciójából fognak rekordokat olvasni.
Fontos
A fogyasztói csoportban található fogyasztói példányok száma nem haladhatja meg a partíciók számát. Ebben a példában egy fogyasztói csoport legfeljebb nyolc fogyasztót tartalmazhat, mivel a témakörben ennyi partíció található. Emellett lehet több, legfeljebb nyolc fogyasztóval rendelkező fogyasztói csoportja is.
A Kafkában tárolt rekordok a partíción belüli fogadásuk sorrendjében vannak tárolva. Ha a rekordokat az érkezési sorrendben szeretné kézbesíteni egy partíción belül, hozzon létre egy fogyasztói csoportot, amelyben a fogyasztói példányok száma egyezik a partíciók számával. Ha a rekordokat az érkezési sorrendben szeretné kézbesíteni a témakörön belül, hozzon létre egy olyan fogyasztói csoportot, amely csak egyetlen fogyasztói példánnyal rendelkezik.
Gyakori problémák
A témakör létrehozása sikertelen Ha a fürt engedélyezve van az Enterprise Security Pack csomagban, használja az előre összeállított JAR-fájlokat az előállító és a fogyasztó számára. Az ESP jar az alkönyvtárban lévő
DomainJoined-Producer-Consumer
kódból hozható létre. Az előállító és a fogyasztó tulajdonságai további tulajdonsággalCommonClientConfigs.SECURITY_PROTOCOL_CONFIG
rendelkeznek az ESP-kompatibilis fürtökhöz.Esp-kompatibilis fürtök hibája: Ha a műveletek előállítása és felhasználása sikertelen, és ESP-kompatibilis fürtöt használ, ellenőrizze, hogy a felhasználó
kafka
jelen van-e az összes Ranger-házirendben. Ha nincs jelen, adja hozzá az összes Ranger-szabályzathoz.
Az erőforrások eltávolítása
Ha törölni szeretné a jelen oktatóanyag által létrehozott erőforrásokat, akkor törölje az erőforráscsoportot. Az erőforráscsoport törlésekor a kapcsolódó HDInsight-fürt, valamint az esetlegesen az erőforráscsoporthoz társított egyéb erőforrások is törlődnek.
Az erőforráscsoport eltávolítása az Azure Portallal:
- Az Azure Portalon bontsa ki a bal oldalon a szolgáltatásmenüt, és válassza az Erőforráscsoportok lehetőséget az erőforráscsoportok listájának megjelenítéséhez.
- Keresse meg a törölni kívánt erőforráscsoportot, és kattintson a jobb gombbal a lista jobb oldalán lévő Továbbiak gombra (...).
- Válassza az Erőforráscsoport törlése elemet, és erősítse meg a választását.
Következő lépések
Ebben a dokumentumban megtanulta, hogyan használhatja az Apache Kafka Producer and Consumer API-t a Kafkával a HDInsighton. Az alábbiak további információt biztosítanak a Kafka használatával kapcsolatban: