Kurz: Použití rozhraní API streamů Apache Kafka ve službě Azure HDInsight

Zjistěte, jak vytvořit aplikaci, která používá rozhraní Apache Kafka Toky API, a spustit ji se systémem Kafka ve službě HDInsight.

Aplikace použitá v tomto kurzu počítá slova v datovém proudu. Přečte textová data z tématu Kafka, extrahuje jednotlivá slova a pak uloží slova a jejich počet do jiného tématu Kafka.

Zpracování datových proudů Kafka se často provádí pomocí Apache Sparku. Kafka verze 2.1.1 a 2.4.1 (v HDInsight 4.0 a 5.0) podporuje rozhraní Kafka Toky API. Toto rozhraní API umožňuje transformovat datové proudy mezi vstupními a výstupními tématy.

Další informace o datových proudech Kafka najdete v úvodní dokumentaci k datovým proudům na webu Apache.org.

V tomto kurzu se naučíte:

  • Vysvětlení kódu
  • Sestavení a nasazení aplikace
  • Konfigurace témat Kafka
  • Spuštění kódu

Požadavky

Vysvětlení kódu

Ukázková aplikace se nachází na adrese https://github.com/Azure-Samples/hdinsight-kafka-java-get-started v podadresáři Streaming. Aplikace se skládá ze dvou souborů:

  • pom.xml: Tento soubor definuje závislosti projektu, verzi Javy a metody balení.
  • Stream.java: Tento soubor implementuje logiku streamování.

Pom.xml

V souboru pom.xml je důležité porozumět následujícímu:

  • Závislosti: Tento projekt spoléhá na rozhraní Kafka Streams API, které je součástí balíčku kafka-clients. Tuto závislost definuje následující kód XML:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    Položka ${kafka.version} se deklaruje v části <properties>..</properties> souboru pom.xml a je nakonfigurovaná na verzi systému Kafka v clusteru HDInsight.

  • Moduly plug-in: Moduly plug-in Mavenu poskytují různé funkce. V tomto projektu se používají následující moduly plug-in:

    • maven-compiler-plugin: Slouží k nastavení verze Javy, kterou projekt používá, na 8. HDInsight 4.0 a 5.0 vyžadují Javu 8.
    • maven-shade-plugin: Slouží k vygenerování souboru JAR uber, který obsahuje tuto aplikaci, a všech závislostí. Používá se také k nastavení vstupního bodu aplikace, abyste mohli přímo spustit soubor Jar, aniž byste museli zadávat hlavní třídu.

Stream.java

Soubor Stream.java pomocí rozhraní Streams API implementuje aplikaci počítání slov. Čte data z tématu Kafka test a zapisuje počty slov do tématu wordcounts.

Následující kód definuje aplikaci počítání slov:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Sestavení a nasazení příkladu

Pokud chcete sestavit a nasadit projekt do clusteru Kafka ve službě HDInsight, postupujte následovně:

  1. Nastavte aktuální adresář na umístění hdinsight-kafka-java-get-started-master\Streaming adresáře a pak pomocí následujícího příkazu vytvořte balíček JAR:

    mvn clean package
    

    Tento příkaz vytvoří balíček v umístění target/kafka-streaming-1.0-SNAPSHOT.jar.

  2. Místo sshuser použijte jméno uživatele SSH pro váš cluster a místo clustername zadejte název clusteru. Pomocí následujícího příkazu zkopírujte kafka-streaming-1.0-SNAPSHOT.jar soubor do clusteru HDInsight. Pokud se zobrazí výzva, zadejte heslo uživatelského účtu SSH.

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
    

Témata týkající se vytváření Apache Kafka

  1. Místo sshuser použijte jméno uživatele SSH pro váš cluster a místo CLUSTERNAME zadejte název clusteru. Otevřete připojení SSH ke clusteru zadáním následujícího příkazu. Pokud se zobrazí výzva, zadejte heslo uživatelského účtu SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Nainstalujte jq, procesor JSON příkazového řádku. Z otevřeného připojení SSH zadejte následující příkaz, který chcete nainstalovat jq:

    sudo apt -y install jq
    
  3. Nastavte proměnnou hesla. Nahraďte PASSWORD přihlašovacím heslem clusteru a pak zadejte příkaz:

    export PASSWORD='PASSWORD'
    
  4. Extrahujte název clusteru se správnými písmeny. Skutečná velikost výskytu názvu clusteru se může lišit od očekávání podle toho, jak byl cluster vytvořen. Tento příkaz získá skutečné velikostí a pak ho uloží do proměnné. Zadejte tento příkaz:

    export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Poznámka:

    Pokud tento proces provádíte mimo cluster, existuje jiný postup pro uložení názvu clusteru. Název clusteru získáte v malých písmenech z webu Azure Portal. Potom nahraďte název <clustername> clusteru následujícím příkazem a spusťte ho: export clusterName='<clustername>'.

  5. K získání hostitelů zprostředkovatele Kafka a hostitelů Apache Zookeeper použijte následující příkazy. Po zobrazení výzvy zadejte heslo pro účet přihlášení clusteru (admin).

    export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Poznámka:

    Tyto příkazy vyžadují přístup Ambari. Pokud je váš cluster za skupinou zabezpečení sítě, spusťte tyto příkazy z počítače, který má přístup k Ambari.

  6. K vytvoření témat, která používá operace streamování, použijte následující příkazy:

    Poznámka:

    Možná se zobrazí chyba, protože téma test již existuje. To je v pořádku, protože se mohlo vytvořit v kurzu k rozhraní Producer and Consumer API.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    Témata slouží k následujícím účelům:

    • test: V tomto tématu se přijímají záznamy. Odtud čte data aplikace streamování.
    • wordcounts: Do tohoto tématu aplikace streamování ukládá výstup.
    • RekeyedIntermediateTopic: Toto téma slouží k opětovnému rozdělení dat při aktualizaci počtu pomocí operátoru countByKey.
    • wordcount-example-Counts-changelog: Toto téma používá operace countByKey jako úložiště stavu.

    Systém Kafka ve službě HDInsight je také možné nakonfigurovat tak, aby vytvářel témata automaticky. Další informace najdete v dokumentu Konfigurace automatického vytváření témat.

Spuštění kódu

  1. Pokud chcete aplikaci streamování spustit jako proces na pozadí, použijte následující příkaz:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    Může se zobrazit upozornění na Apache log4j. Toto upozornění můžete ignorovat.

  2. K odesílání záznamů do tématu test použijte následující příkaz, který spustí aplikaci producenta:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. Jakmile bude producent hotový, pomocí následujícího příkazu zobrazte informace uložené v tématu wordcounts:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    Parametry --property říkají konzole konzumenta, aby vytiskla klíč (slovo) společně s počtem (hodnota). Tento parametr také konfiguruje deserializátor, který se použije při čtení těchto hodnot ze systému Kafka.

    Výstup se bude podobat následujícímu:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    Parametr --from-beginning konfiguruje konzumenta tak, aby začal číst záznamy uložené v tématu od začátku. Počet se zvýší při každém zjištění slova, takže téma obsahuje pro každé slovo několik záznamů s rostoucím počtem.

  4. Producenta ukončíte stisknutím Ctrl+C. Pokračujte a pomocí Ctrl + C ukončete aplikaci i konzumenta.

  5. Pokud chcete odstranit témata používaná operací streamování, použijte následující příkazy:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

Vyčištění prostředků

Pokud chcete vyčistit prostředky vytvořené v tomto kurzu, můžete odstranit skupinu prostředků. Odstraněním skupiny prostředků odstraníte také přidružený cluster HDInsight a všechny další prostředky, které jsou k příslušné skupině prostředků přidružené.

Odebrání skupiny prostředků pomocí webu Azure Portal:

  1. Na webu Azure Portal rozbalením nabídky na levé straně otevřete nabídku služeb a pak zvolte Skupiny prostředků. Zobrazí se seznam skupin prostředků.
  2. Vyhledejte skupinu prostředků, kterou chcete odstranit, a klikněte pravým tlačítkem na tlačítko Další (...) na pravé straně seznamu.
  3. Vyberte Odstranit skupinu prostředků a potvrďte tuto akci.

Další kroky

V tomto dokumentu jste zjistili, jak používat rozhraní Apache Kafka Toky API se systémem Kafka ve službě HDInsight. Další informace o práci se systémem Kafka najdete v následujícím tématu.