Oktatóanyag: Apache Kafka streams API használata az Azure HDInsightban
Megtudhatja, hogyan hozhat létre olyan alkalmazást, amely az Apache Kafka adatfolyamok API-t használja, és hogyan futtathatja a Kafkával a HDInsighton.
A jelen oktatóanyagban használt alkalmazás egy streamelési szószámláló. Szöveges adatokat olvas be egy Kafka-témakörből, kigyűjti az egyes szavakat, majd eltárolja a szavakat és azok számát egy másik Kafka-témakörben.
A Kafka-streamfeldolgozást gyakran az Apache Spark használatával végzik. A Kafka 2.1.1-es és 2.4.1-es verziója (a HDInsight 4.0-s és 5.0-s verziójában) támogatja a Kafka adatfolyamok API-t. Ez az API lehetővé teszi az adatstreamek a bemeneti és kimeneti témakörök közötti átalakítását.
A Kafka Streams megoldással kapcsolatos további információkért tekintse meg a Streams bevezető dokumentációját az Apache.org webhelyen.
Ebben az oktatóanyagban az alábbiakkal fog megismerkedni:
- A kód értelmezése
- Az alkalmazás létrehozása és üzembe helyezése
- Kafka-témakörök konfigurálása
- A kód futtatása
Előfeltételek
Kafka a HDInsight 4.0-s vagy 5.0-s fürtjén. Ha meg szeretné tudni, hogyan hozhat létre Kafkát a HDInsight-fürtön, tekintse meg a HDInsight-alapú Apache Kafka használatának megkezdését ismertető dokumentumot.
Hajtsa végre a lépéseket az Apache Kafka Consumer and Producer API-dokumentumban . A dokumentumban leírt lépések az ebben az oktatóanyagban létrehozott példaalkalmazást és -témaköröket használják.
A Java Developer Kit (JDK) 8-as vagy azzal egyenértékű verziója, például az OpenJDK.
Az Apache Maven megfelelően van telepítve az Apache szerint. A Maven egy Java-projektek projektépítési rendszere.
Egy SSH-ügyfél. For more information, see Connect to HDInsight (Apache Hadoop) using SSH.
A kód értelmezése
A példaalkalmazás helye: https://github.com/Azure-Samples/hdinsight-kafka-java-get-started (Streaming
alkönyvtár). Az alkalmazás két fájlt tartalmaz:
pom.xml
: Ez a fájl határozza meg a projektfüggőségeket, a Java-verziót és a csomagolási módszereket.Stream.java
: Ez a fájl valósítja meg a streamelési logikát.
Pom.xml
A pom.xml
fájl fontosabb elemei a következők:
Függőségek: A projekt a Kafka Streams API-jára épül, amelyet a
kafka-clients
csomag tartalmaz. Ezt a függőséget a következő XML-kód határozza meg:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
A
${kafka.version}
bejegyzés apom.xml
fájl<properties>..</properties>
szakaszában van meghatározva, és a HDInsight-fürt Kafka-verziójához van konfigurálva.Beépülő modulok: A Maven beépülő modulok különböző képességekkel rendelkeznek. Ebben a projektben a következő beépülő modulokat használjuk:
maven-compiler-plugin
: Ezzel állítható a projekt által használt Java-verzió a 8-as verzióra. A HDInsight 4.0 és 5.0 használatához Java 8 szükséges.maven-shade-plugin
: Az alkalmazást és a függőségeket tartalmazó uber jar létrehozásához használatos. Az alkalmazás belépési pontjának beállítására is használható, így közvetlenül futtathatja a Jar-fájlt anélkül, hogy meg kellene adnia a főosztályt.
Stream.java
A Stream.java fájl a Streams API-val implementálja a szószámláló alkalmazást. Adatokat olvas egy test
nevű Kafka-témakörből, és a szószámokat egy wordcounts
nevű témakörbe írja.
A következő kód definiálja a szószámláló alkalmazást:
package com.microsoft.example;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Arrays;
import java.util.Properties;
public class Stream
{
public static void main( String[] args ) {
Properties streamsConfig = new Properties();
// The name must be unique on the Kafka cluster
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
// Brokers
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
// SerDes for key and values
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Serdes for the word and count
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = sentences
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcounts");
KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
A példa létrehozása és üzembe helyezése
A projekt összeállításához és a Kafka on HDInsight-fürtön való üzembe helyezéséhez hajtsa végre a következő lépéseket:
Állítsa be az aktuális könyvtárat a
hdinsight-kafka-java-get-started-master\Streaming
könyvtár helyére, majd az alábbi paranccsal hozzon létre egy jar-csomagot:mvn clean package
Ez a parancs a
target/kafka-streaming-1.0-SNAPSHOT.jar
helyen hozza létre a csomagot.Cserélje le az
sshuser
elemet a fürt SSH-felhasználójára, illetve aclustername
elemet a fürt nevére. A következő paranccsal másolja a fájlt akafka-streaming-1.0-SNAPSHOT.jar
HDInsight-fürtbe. Ha a rendszer kéri, adja meg az SSH-felhasználói fiók jelszavát.scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
Apache Kafka-témakörök létrehozása
Cserélje le az
sshuser
elemet a fürt SSH-felhasználójára, illetve aCLUSTERNAME
elemet a fürt nevére. Nyisson meg egy SSH-kapcsolatot a fürthöz az alábbi paranccsal. Ha a rendszer kéri, adja meg az SSH-felhasználói fiók jelszavát.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Telepítse a jq parancssori JSON-processzort. A nyílt SSH-kapcsolaton a telepítéshez
jq
írja be a következő parancsot:sudo apt -y install jq
Jelszóváltozó beállítása. Cserélje le
PASSWORD
a fürt bejelentkezési jelszavára, majd írja be a következő parancsot:export PASSWORD='PASSWORD'
Bontsa ki a helyesen kisbetűs fürtnevet. A fürt nevének tényleges burkolata eltérhet a várttól, a fürt létrehozásának módjától függően. Ez a parancs lekérte a tényleges burkolatot, majd egy változóban tárolja. Írja be az alábbi parancsot:
export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Megjegyzés:
Ha ezt a folyamatot a fürtön kívülről végzi, a fürt nevének tárolására más eljárás is létezik. Kérje le a fürt nevét kisbetűvel az Azure Portalról. Ezután cserélje le a fürt nevét
<clustername>
a következő parancsra, és hajtsa végre:export clusterName='<clustername>'
.A Kafka-közvetítő-gazdagépek és az Apache Zookeeper-gazdagépek lekéréséhez használja az alábbi parancsokat. Ha a rendszer kéri, adja meg a fürt bejelentkezési (rendszergazdai) fiókjának jelszavát.
export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Megjegyzés:
Ezek a parancsok Ambari-hozzáférést igényelnek. Ha a fürt egy NSG mögött található, futtassa ezeket a parancsokat egy olyan gépről, amely hozzáfér az Ambarihoz.
A streamelési művelet által használt témakörök létrehozásához használja az alábbi parancsokat:
Megjegyzés:
Hibaüzenetet kaphat, miszerint a
test
témakör már létezik. Ez rendben van, mivel lehetséges, hogy a témakör az előállítói és fogyasztói API-val foglalkozó oktatóanyagban már létre lett hozva./usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
A témaköröket a következő célokra használjuk:
test
: A témakör, ahová a rekordok érkeznek. A streamelési alkalmazás innen olvassa az adatokat.wordcounts
: A témakör, amelyben a streamelési alkalmazás a kimenetet tárolja.RekeyedIntermediateTopic
: Ez a témakör szolgál az adatok újraparticionálására, ahogy acountByKey
operátor frissíti a számlálót.wordcount-example-Counts-changelog
: Ez a témakör egy, acountByKey
művelet által használt állapottároló.
A Kafka on HDInsight a témakörök automatikus létrehozására is konfigurálható. További információkért tekintse meg a témakörök automatikus létrehozásának konfigurálását ismertető dokumentumot.
A kód futtatása
A streamelési alkalmazás háttérfolyamatként való indításához használja a következő parancsot:
java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
Figyelmeztetést kaphat az Apache-ról
log4j
. Ezt a figyelmeztetést figyelmen kívül hagyhatja.A bejegyzések a
test
témakörbe való küldéséhez az alábbi parancs használatával indítsa el az előállító alkalmazást:java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
Miután az előállító végzett, az alábbi parancs használatával tekintheti meg a
wordcounts
témakörben tárolt adatokat:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
A
--property
paraméterek utasítják a konzolfogyasztót, hogy a kulcsot (szó) a számlálóval (érték) együtt írja ki. Ez a paraméter emellett az értékek a Kafkából való beolvasásához használandó deszerializálót is konfigurálja.A kimenet az alábbi szöveghez hasonló:
dwarfs 13635 ago 13664 snow 13636 dwarfs 13636 ago 13665 a 13803 ago 13666 a 13804 ago 13667 ago 13668 jumped 13640 jumped 13641
A
--from-beginning
paraméter arra konfigurálja a fogyasztót, hogy a témakörben tárolt rekordok elején kezdjen. A számláló minden egyes szó beolvasásával nő, így a témakör az egyes szavakhoz több bejegyzést is tartalmaz, növekvő számlálóval.A Ctrl + C billentyűparanccsal zárhatja be az előállítót. A Ctrl + C billentyűparancs ismételt lenyomásával zárhatja be az alkalmazást és a fogyasztót is.
A streamelési művelet által használt témakörök törléséhez használja az alábbi parancsokat:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Clean up resources
Ha törölni szeretné a jelen oktatóanyag által létrehozott erőforrásokat, akkor törölje az erőforráscsoportot. Az erőforráscsoport törlésekor a kapcsolódó HDInsight-fürt, valamint az esetlegesen az erőforráscsoporthoz társított egyéb erőforrások is törlődnek.
Az erőforráscsoport eltávolítása az Azure Portallal:
- Az Azure Portalon bontsa ki a bal oldalon a szolgáltatásmenüt, és válassza az Erőforráscsoportok lehetőséget az erőforráscsoportok listájának megjelenítéséhez.
- Keresse meg a törölni kívánt erőforráscsoportot, és kattintson a jobb gombbal a lista jobb oldalán lévő Továbbiak gombra (...).
- Válassza az Erőforráscsoport törlése elemet, és erősítse meg a választását.
További lépések
Ebben a dokumentumban megtanulta, hogyan használhatja az Apache Kafka adatfolyamok API-t a HDInsighton futó Kafkával. Az alábbiakban további információt olvashat a Kafka használatáról.