Share via


Oktatóanyag: Apache Kafka streams API használata az Azure HDInsightban

Megtudhatja, hogyan hozhat létre olyan alkalmazást, amely az Apache Kafka adatfolyamok API-t használja, és hogyan futtathatja a Kafkával a HDInsighton.

A jelen oktatóanyagban használt alkalmazás egy streamelési szószámláló. Szöveges adatokat olvas be egy Kafka-témakörből, kigyűjti az egyes szavakat, majd eltárolja a szavakat és azok számát egy másik Kafka-témakörben.

A Kafka-streamfeldolgozást gyakran az Apache Spark használatával végzik. A Kafka 2.1.1-es és 2.4.1-es verziója (a HDInsight 4.0-s és 5.0-s verziójában) támogatja a Kafka adatfolyamok API-t. Ez az API lehetővé teszi az adatstreamek a bemeneti és kimeneti témakörök közötti átalakítását.

A Kafka Streams megoldással kapcsolatos további információkért tekintse meg a Streams bevezető dokumentációját az Apache.org webhelyen.

Ebben az oktatóanyagban az alábbiakkal fog megismerkedni:

  • A kód értelmezése
  • Az alkalmazás létrehozása és üzembe helyezése
  • Kafka-témakörök konfigurálása
  • A kód futtatása

Előfeltételek

A kód értelmezése

A példaalkalmazás helye: https://github.com/Azure-Samples/hdinsight-kafka-java-get-started (Streaming alkönyvtár). Az alkalmazás két fájlt tartalmaz:

  • pom.xml: Ez a fájl határozza meg a projektfüggőségeket, a Java-verziót és a csomagolási módszereket.
  • Stream.java: Ez a fájl valósítja meg a streamelési logikát.

Pom.xml

A pom.xml fájl fontosabb elemei a következők:

  • Függőségek: A projekt a Kafka Streams API-jára épül, amelyet a kafka-clients csomag tartalmaz. Ezt a függőséget a következő XML-kód határozza meg:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    A ${kafka.version} bejegyzés a pom.xml fájl <properties>..</properties> szakaszában van meghatározva, és a HDInsight-fürt Kafka-verziójához van konfigurálva.

  • Beépülő modulok: A Maven beépülő modulok különböző képességekkel rendelkeznek. Ebben a projektben a következő beépülő modulokat használjuk:

    • maven-compiler-plugin: Ezzel állítható a projekt által használt Java-verzió a 8-as verzióra. A HDInsight 4.0 és 5.0 használatához Java 8 szükséges.
    • maven-shade-plugin: Az alkalmazást és a függőségeket tartalmazó uber jar létrehozásához használatos. Az alkalmazás belépési pontjának beállítására is használható, így közvetlenül futtathatja a Jar-fájlt anélkül, hogy meg kellene adnia a főosztályt.

Stream.java

A Stream.java fájl a Streams API-val implementálja a szószámláló alkalmazást. Adatokat olvas egy test nevű Kafka-témakörből, és a szószámokat egy wordcounts nevű témakörbe írja.

A következő kód definiálja a szószámláló alkalmazást:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

A példa létrehozása és üzembe helyezése

A projekt összeállításához és a Kafka on HDInsight-fürtön való üzembe helyezéséhez hajtsa végre a következő lépéseket:

  1. Állítsa be az aktuális könyvtárat a hdinsight-kafka-java-get-started-master\Streaming könyvtár helyére, majd az alábbi paranccsal hozzon létre egy jar-csomagot:

    mvn clean package
    

    Ez a parancs a target/kafka-streaming-1.0-SNAPSHOT.jar helyen hozza létre a csomagot.

  2. Cserélje le az sshuser elemet a fürt SSH-felhasználójára, illetve a clustername elemet a fürt nevére. A következő paranccsal másolja a fájlt a kafka-streaming-1.0-SNAPSHOT.jar HDInsight-fürtbe. Ha a rendszer kéri, adja meg az SSH-felhasználói fiók jelszavát.

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
    

Apache Kafka-témakörök létrehozása

  1. Cserélje le az sshuser elemet a fürt SSH-felhasználójára, illetve a CLUSTERNAME elemet a fürt nevére. Nyisson meg egy SSH-kapcsolatot a fürthöz az alábbi paranccsal. Ha a rendszer kéri, adja meg az SSH-felhasználói fiók jelszavát.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Telepítse a jq parancssori JSON-processzort. A nyílt SSH-kapcsolaton a telepítéshez jqírja be a következő parancsot:

    sudo apt -y install jq
    
  3. Jelszóváltozó beállítása. Cserélje le PASSWORD a fürt bejelentkezési jelszavára, majd írja be a következő parancsot:

    export PASSWORD='PASSWORD'
    
  4. Bontsa ki a helyesen kisbetűs fürtnevet. A fürt nevének tényleges burkolata eltérhet a várttól, a fürt létrehozásának módjától függően. Ez a parancs lekérte a tényleges burkolatot, majd egy változóban tárolja. Írja be az alábbi parancsot:

    export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Megjegyzés:

    Ha ezt a folyamatot a fürtön kívülről végzi, a fürt nevének tárolására más eljárás is létezik. Kérje le a fürt nevét kisbetűvel az Azure Portalról. Ezután cserélje le a fürt nevét <clustername> a következő parancsra, és hajtsa végre: export clusterName='<clustername>'.

  5. A Kafka-közvetítő-gazdagépek és az Apache Zookeeper-gazdagépek lekéréséhez használja az alábbi parancsokat. Ha a rendszer kéri, adja meg a fürt bejelentkezési (rendszergazdai) fiókjának jelszavát.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Megjegyzés:

    Ezek a parancsok Ambari-hozzáférést igényelnek. Ha a fürt egy NSG mögött található, futtassa ezeket a parancsokat egy olyan gépről, amely hozzáfér az Ambarihoz.

  6. A streamelési művelet által használt témakörök létrehozásához használja az alábbi parancsokat:

    Megjegyzés:

    Hibaüzenetet kaphat, miszerint a test témakör már létezik. Ez rendben van, mivel lehetséges, hogy a témakör az előállítói és fogyasztói API-val foglalkozó oktatóanyagban már létre lett hozva.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    A témaköröket a következő célokra használjuk:

    • test: A témakör, ahová a rekordok érkeznek. A streamelési alkalmazás innen olvassa az adatokat.
    • wordcounts: A témakör, amelyben a streamelési alkalmazás a kimenetet tárolja.
    • RekeyedIntermediateTopic: Ez a témakör szolgál az adatok újraparticionálására, ahogy a countByKey operátor frissíti a számlálót.
    • wordcount-example-Counts-changelog: Ez a témakör egy, a countByKey művelet által használt állapottároló.

    A Kafka on HDInsight a témakörök automatikus létrehozására is konfigurálható. További információkért tekintse meg a témakörök automatikus létrehozásának konfigurálását ismertető dokumentumot.

A kód futtatása

  1. A streamelési alkalmazás háttérfolyamatként való indításához használja a következő parancsot:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    Figyelmeztetést kaphat az Apache-ról log4j. Ezt a figyelmeztetést figyelmen kívül hagyhatja.

  2. A bejegyzések a test témakörbe való küldéséhez az alábbi parancs használatával indítsa el az előállító alkalmazást:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. Miután az előállító végzett, az alábbi parancs használatával tekintheti meg a wordcounts témakörben tárolt adatokat:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    A --property paraméterek utasítják a konzolfogyasztót, hogy a kulcsot (szó) a számlálóval (érték) együtt írja ki. Ez a paraméter emellett az értékek a Kafkából való beolvasásához használandó deszerializálót is konfigurálja.

    A kimenet az alábbi szöveghez hasonló:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    A --from-beginning paraméter arra konfigurálja a fogyasztót, hogy a témakörben tárolt rekordok elején kezdjen. A számláló minden egyes szó beolvasásával nő, így a témakör az egyes szavakhoz több bejegyzést is tartalmaz, növekvő számlálóval.

  4. A Ctrl + C billentyűparanccsal zárhatja be az előállítót. A Ctrl + C billentyűparancs ismételt lenyomásával zárhatja be az alkalmazást és a fogyasztót is.

  5. A streamelési művelet által használt témakörök törléséhez használja az alábbi parancsokat:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

Clean up resources

Ha törölni szeretné a jelen oktatóanyag által létrehozott erőforrásokat, akkor törölje az erőforráscsoportot. Az erőforráscsoport törlésekor a kapcsolódó HDInsight-fürt, valamint az esetlegesen az erőforráscsoporthoz társított egyéb erőforrások is törlődnek.

Az erőforráscsoport eltávolítása az Azure Portallal:

  1. Az Azure Portalon bontsa ki a bal oldalon a szolgáltatásmenüt, és válassza az Erőforráscsoportok lehetőséget az erőforráscsoportok listájának megjelenítéséhez.
  2. Keresse meg a törölni kívánt erőforráscsoportot, és kattintson a jobb gombbal a lista jobb oldalán lévő Továbbiak gombra (...).
  3. Válassza az Erőforráscsoport törlése elemet, és erősítse meg a választását.

További lépések

Ebben a dokumentumban megtanulta, hogyan használhatja az Apache Kafka adatfolyamok API-t a HDInsighton futó Kafkával. Az alábbiakban további információt olvashat a Kafka használatáról.