Megosztás a következőn keresztül:


Oktatóanyag: Az Apache Kafka Producer és Consumer API-k használata

Ismerkedjen meg az Apache Kafka Producer és Consumer API-k a Kafka on HDInsighttal való használatával.

A Kafka Producer API segítségével az alkalmazások adatstreameket küldhetnek a Kafka-fürtnek. A Kafka Consumer API segítségével az alkalmazások adatstreameket olvashatnak be a fürtből.

Eben az oktatóanyagban az alábbiakkal fog megismerkedni:

  • Előfeltételek
  • A kód értelmezése
  • Az alkalmazás létrehozása és üzembe helyezése
  • Az alkalmazás futtatása a fürtön

Az API-król további információkat az Apache dokumentációjának Producer API-val és Consumer API-val foglalkozó részeiben talál.

Előfeltételek

A kód értelmezése

A példaalkalmazás helye: https://github.com/Azure-Samples/hdinsight-kafka-java-get-started (Producer-Consumer alkönyvtár). Ha az Enterprise Security Package (ESP) kompatibilis Kafka-fürtöt használja, az alkönyvtárban DomainJoined-Producer-Consumer található alkalmazásverziót kell használnia.

Az alkalmazás elsődlegesen négy fájlból áll:

  • pom.xml: Ez a fájl határozza meg a projektfüggőségeket, a Java-verziót és a csomagolási módszereket.
  • Producer.java: Ez a fájl véletlenszerű mondatokat küld a Kafkának a Producer API használatával.
  • Consumer.java: Ez a fájl a Consumer API-t használatával olvas ki adatokat a Kafkából, és az STDOUT-ba küldi el azokat.
  • AdminClientWrapper.java: Ez a fájl a rendszergazdai API-t használja a Kafka-témakörök létrehozásához, leírásához és törléséhez.
  • Run.java: A Producer- és Consumer-kód futtatásához a rendszer a parancssori felületet használja.

Pom.xml

A pom.xml fájl fontosabb elemei a következők:

  • Függőségek: Ez a projekt a Kafka Producer és Consumer API-jaira támaszkodik, amelyeket a kafka-clients csomag tartalmaz. Ezt a függőséget a következő XML-kód határozza meg:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    A ${kafka.version} bejegyzés a pom.xml fájl <properties>..</properties> szakaszában van meghatározva, és a HDInsight-fürt Kafka-verziójához van konfigurálva.

  • Beépülő modulok: A Maven beépülő modulok különböző képességekkel rendelkeznek. Ebben a projektben a következő beépülő modulokat használjuk:

    • maven-compiler-plugin: Ezzel állítható a projekt által használt Java-verzió a 8-as verzióra. A HDInsight 3.6 ezt a Java-verziót használja.
    • maven-shade-plugin: Ez az alkalmazást és az esetleges függőségeket tartalmazó uber JAR-fájl létrehozására használható. Az alkalmazás belépési pontjának beállítására is használható, így közvetlenül futtathatja a JAR-fájlt anélkül, hogy a fő osztályt meg kellene határoznia.

Producer.java

A Producer a Kafka közvetítő gazdagépeivel (feldolgozó csomópontjaival) kommunikál, és adatok küld egy Kafka-témakörnek. A következő kódrészlet a GitHub-adattárProducer.java fájljából származik, és bemutatja, hogyan állíthatja be az előállító tulajdonságait. Az Enterprise Security Enabled fürtök esetében egy további tulajdonságot kell hozzáadni a következőhöz: "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Consumer.java

A Consumer a Kafka közvetítő gazdagépeivel (feldolgozó csomópontjaival) kommunikál, majd egy ciklusban beolvassa az adatokat. A Consumer.java fájl következő kódrészlete állítja be a fogyasztó tulajdonságait. Az Enterprise Security Enabled fürtök esetében egy további tulajdonságot kell hozzáadni a következőhöz: "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");

consumer = new KafkaConsumer<>(properties);

Ebben a kódban a Consumer úgy van beállítva, hogy a beolvasást a témakör elejétől kezdje (az auto.offset.reset beállítása: earliest).

Run.java

A Run.java fájl egy parancssori felületet biztosít, amely az előállító vagy a fogyasztói kódot futtatja. Paraméterként a Kafka közvetítő gazdagépének adatait kell megadni. Megadhat egy csoportazonosító-értéket is, amelyet a fogyasztói folyamat használ. Ha több fogyasztói példányt hoz létre ugyanazzal a csoportazonosítóval, azok terheléselosztást fognak biztosítani a témakörből származó olvasáshoz.

A példa létrehozása és üzembe helyezése

Előre elkészített JAR-fájlok használata

Töltse le a jars-fájlokat a Kafka Get Started Azure-mintából. Ha a fürt engedélyezve van az Enterprise Security Package (ESP) szolgáltatásban, használja a kafka-producer-consumer-esp.jar fájlt. Az alábbi paranccsal másolja a jars-fájlokat a fürtbe.

scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar

JAR-fájlok létrehozása kódból

Ha ki szeretné hagyni ezt a lépést, az előre összeállított jar-fájlok letölthetők az Prebuilt-Jars alkönyvtárból. Töltse le a kafka-producer-consumer.jar fájlt. Ha a fürt engedélyezve van az Enterprise Security Package (ESP) szolgáltatásban, használja a kafka-producer-consumer-esp.jar fájlt. A 3. lépés végrehajtásával másolja a jar-t a HDInsight-fürtbe.

  1. Töltse le és bontsa ki a példákat a fájlból https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.

  2. Állítsa be az aktuális könyvtárat a könyvtár helyére hdinsight-kafka-java-get-started\Producer-Consumer . Ha az Enterprise Security Package (ESP) kompatibilis Kafka-fürtöt használja, a helyet alkönyvtárra DomainJoined-Producer-Consumerkell állítania. Az alkalmazás létrehozásához használja a következő parancsot:

    mvn clean package
    

    A parancs létrehozza a target nevű könyvtárat, amely a kafka-producer-consumer-1.0-SNAPSHOT.jar nevű fájlt tartalmazza. ESP-fürtök esetén a fájl a következő lesz: kafka-producer-consumer-esp-1.0-SNAPSHOT.jar

  3. Cserélje le az sshuser elemet a fürt SSH-felhasználójára, illetve a CLUSTERNAME elemet a fürt nevére. Írja be a következő parancsot a kafka-producer-consumer-1.0-SNAPSHOT.jar fájl HDInsight-fürtbe való másolásához. Ha a rendszer kéri, adja meg az SSH-felhasználó jelszavát.

    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

A példa futtatása

  1. Cserélje le az sshuser elemet a fürt SSH-felhasználójára, illetve a CLUSTERNAME elemet a fürt nevére. Nyisson meg egy SSH-kapcsolatot a fürthöz a következő parancs megadásával. Ha a rendszer kéri, adja meg az SSH-felhasználói fiók jelszavát.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. A Kafka-közvetítő gazdagépek lekéréséhez cserélje le az értékeket a és <password> értékre <clustername> a következő parancsban, és hajtsa végre. Használja ugyanazt a burkolatot, <clustername> mint a Azure Portal. Cserélje le a elemet <password> a fürt bejelentkezési jelszavára, majd hajtsa végre a következőt:

    sudo apt -y install jq
    export CLUSTER_NAME='<clustername>'
    export PASSWORD='<password>'
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Megjegyzés

    Ehhez a parancshoz Ambari-hozzáférés szükséges. Ha a fürt egy NSG mögött található, futtassa ezt a parancsot egy olyan gépről, amely hozzáfér az Ambarihoz.

  3. Hozza létre a Kafka-témakört a myTestkövetkező paranccsal:

    java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
    
  4. Az előállító futtatásához és az adatok témakörbe írásához használja az alábbi parancsot:

    java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
    
  5. Az előállító futásának befejeződését követően használja az alábbi parancsot a témakörből történő olvasáshoz:

    java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS
    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

    A rendszer megjeleníti a beolvasott rekordokat a rekordok számával együtt.

  6. Használja a Ctrl + C billentyűparancsot a fogyasztóból történő kilépéshez.

Több fogyasztó

A Kafka-fogyasztók egy fogyasztói csoportot használnak a rekordok olvasásakor. Ugyanazon csoport használata több fogyasztó esetén a terhelés szempontjából kiegyensúlyozott olvasást eredményez a témakörökből történő olvasáskor. A csoport mindegyik fogyasztója a rekordok egy részét kapja meg.

A fogyasztó alkalmazás elfogadja a csoportazonosítóként használt paramétert. Az alábbi parancs például elindít egy fogyasztót a myGroup csoportazonosítóval:

java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup

Használja a Ctrl + C billentyűparancsot a fogyasztóból történő kilépéshez.

Ha szeretné látni, hogyan zajlik a folyamat, használja az alábbi parancsot:

tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach

Ez a parancs a tmux használatával két oszlopra osztja a terminált. Mindegyik oszlopban elindul egy-egy fogyasztó, amelyekhez azonos csoportazonosító-érték tartozik. Ha a fogyasztók befejezték a beolvasást, figyelje meg, hogy mindegyik csak a rekordok egy részét olvasta be. A Ctrl + C billentyűkombináció kétszeri lenyomásával lépjen ki a alkalmazásból tmux.

Az ugyanazon csoportban található ügyfelek általi felhasználás kezelése a témakör partícióinak használatával történik. Ebben a kódmintában a korábban létrehozott test témakör nyolc partícióval rendelkezik. Ha elindít nyolc fogyasztót, az egyes fogyasztók a témakör egyetlen partíciójából fognak rekordokat olvasni.

Fontos

A fogyasztói csoportban található fogyasztói példányok száma nem haladhatja meg a partíciók számát. Ebben a példában egy fogyasztói csoport legfeljebb nyolc fogyasztót tartalmazhat, mivel a témakörben ennyi partíció található. Emellett lehet több, legfeljebb nyolc fogyasztóval rendelkező fogyasztói csoportja is.

A Kafkában tárolt rekordok a partíción belüli fogadásuk sorrendjében vannak tárolva. Ha a rekordokat az érkezési sorrendben szeretné kézbesíteni egy partíción belül, hozzon létre egy fogyasztói csoportot, amelyben a fogyasztói példányok száma egyezik a partíciók számával. Ha a rekordokat az érkezési sorrendben szeretné kézbesíteni a témakörön belül, hozzon létre egy olyan fogyasztói csoportot, amely csak egyetlen fogyasztói példánnyal rendelkezik.

Gyakori problémák

  1. A témakör létrehozása sikertelen Ha a fürt engedélyezve van az Enterprise Security Pack csomagban, használja az előre összeállított JAR-fájlokat az előállító és a fogyasztó számára. Az ESP jar az alkönyvtárban lévőDomainJoined-Producer-Consumer kódból hozható létre. Az előállító és a fogyasztó tulajdonságai további tulajdonsággal CommonClientConfigs.SECURITY_PROTOCOL_CONFIG rendelkeznek az ESP-kompatibilis fürtökhöz.

  2. Esp-kompatibilis fürtök hibája: Ha a műveletek előállítása és felhasználása sikertelen, és ESP-kompatibilis fürtöt használ, ellenőrizze, hogy a felhasználó kafka jelen van-e az összes Ranger-házirendben. Ha nincs jelen, adja hozzá az összes Ranger-szabályzathoz.

Az erőforrások eltávolítása

Ha törölni szeretné a jelen oktatóanyag által létrehozott erőforrásokat, akkor törölje az erőforráscsoportot. Az erőforráscsoport törlésekor a kapcsolódó HDInsight-fürt, valamint az esetlegesen az erőforráscsoporthoz társított egyéb erőforrások is törlődnek.

Az erőforráscsoport eltávolítása az Azure Portallal:

  1. Az Azure Portalon bontsa ki a bal oldalon a szolgáltatásmenüt, és válassza az Erőforráscsoportok lehetőséget az erőforráscsoportok listájának megjelenítéséhez.
  2. Keresse meg a törölni kívánt erőforráscsoportot, és kattintson a jobb gombbal a lista jobb oldalán lévő Továbbiak gombra (...).
  3. Válassza az Erőforráscsoport törlése elemet, és erősítse meg a választását.

Következő lépések

Ebben a dokumentumban megtanulta, hogyan használhatja az Apache Kafka Producer and Consumer API-t a Kafkával a HDInsighton. Az alábbiak további információt biztosítanak a Kafka használatával kapcsolatban: