Kurz: Použití rozhraní Apache Kafka Producer and Consumer API

Zjistěte, jak používat rozhraní Apache Kafka Producer and Consumer API se systémem Kafka ve službě HDInsight.

Rozhraní Kafka Producer API umožňuje aplikacím odesílat datové proudy do clusteru Kafka. Rozhraní Kafka Consumer API umožňuje aplikacím číst datové proudy z clusteru.

V tomto kurzu se naučíte:

  • Požadavky
  • Vysvětlení kódu
  • Sestavení a nasazení aplikace
  • Spuštění aplikace v clusteru

Další informace o rozhraních API najdete v dokumentaci k rozhraní Producer API a Consumer API na webu Apache.

Požadavky

Vysvětlení kódu

Ukázková aplikace se nachází na adrese https://github.com/Azure-Samples/hdinsight-kafka-java-get-started v podadresáři Producer-Consumer. Pokud používáte cluster Kafka s podporou balíčku enterprise security Package (ESP), měli byste použít verzi aplikace umístěnou DomainJoined-Producer-Consumer v podadresáři.

Aplikace se skládá primárně ze čtyř souborů:

  • pom.xml: Tento soubor definuje závislosti projektu, verzi Javy a metody balení.
  • Producer.java: Tento soubor pomocí rozhraní Producer API odesílá do systému Kafka náhodné věty.
  • Consumer.java: Tento soubor pomocí rozhraní Consumer API čte data ze systému Kafka a posílá je do výstupu STDOUT.
  • AdminClientWrapper.java: Tento soubor používá rozhraní API pro správu k vytváření, popisu a odstraňování témat Kafka.
  • Run.java: Rozhraní příkazového řádku, které slouží ke spuštění kódu producenta a konzumenta.

Pom.xml

V souboru pom.xml je důležité porozumět následujícímu:

  • Závislosti: Tento projekt spoléhá na rozhraní Kafka Producer and Consumer API, která jsou součástí balíčku kafka-clients. Tuto závislost definuje následující kód XML:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    Položka ${kafka.version} se deklaruje v části <properties>..</properties> souboru pom.xml a je nakonfigurovaná na verzi systému Kafka v clusteru HDInsight.

  • Moduly plug-in: Moduly plug-in Mavenu poskytují různé funkce. V tomto projektu se používají následující moduly plug-in:

    • maven-compiler-plugin: Slouží k nastavení verze Javy, kterou projekt používá, na 8. To je verze Javy, kterou používá HDInsight 3.6.
    • maven-shade-plugin: Slouží k vygenerování souboru JAR, který obsahuje tuto aplikaci i všechny závislosti. Používá se také k nastavení vstupního bodu aplikace, abyste mohli přímo spustit soubor JAR bez nutnosti zadávat hlavní třídu.

Producer.java

Producent komunikuje s hostiteli zprostředkovatelů Kafka (pracovní uzly) a odesílá data do tématu Kafka. Následující fragment kódu pochází ze souboru Producer.java z úložiště GitHub a ukazuje, jak nastavit vlastnosti producenta. Pro clustery s povoleným zabezpečením podniku je nutné přidat další vlastnost properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");

Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Consumer.java

Konzument komunikuje s hostiteli zprostředkovatelů Kafka (pracovní uzly) a ve smyčce čte záznamy. Následující fragment kódu ze souboru Consumer.java nastaví vlastnosti příjemce. Pro clustery s povoleným zabezpečením podniku je nutné přidat další vlastnost properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");

KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");

consumer = new KafkaConsumer<>(properties);

V tomto kódu je konzument nakonfigurovaný tak, aby četl od začátku tématu (hodnota auto.offset.reset je nastavená na earliest).

Run.java

Soubor Run.java poskytuje rozhraní příkazového řádku, které spouští kód producenta nebo příjemce. Jako parametr je potřeba zadat informace o hostiteli zprostředkovatele Kafka. Volitelně můžete zahrnout hodnotu ID skupiny, kterou používá proces příjemce. Pokud vytvoříte více instancí příjemců pomocí stejného ID skupiny, budou vyrovnávat zatížení čtení z tématu.

Sestavení a nasazení příkladu

Použití předdefinovaných souborů JAR

Stáhněte si soubory JAR z ukázky Kafka Začínáme s Azure. Pokud je u vašeho clusteru povolený balíček zabezpečení podniku (ESP), použijte kafka-producer-consumer-esp.jar. Pomocí následujícího příkazu zkopírujte soubory JAR do clusteru.

scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar

Sestavení souborů JAR z kódu

Pokud chcete tento krok přeskočit, můžete si z Prebuilt-Jars podadresáře stáhnout předem připravené soubory JAR. Stáhněte soubor kafka-producer-consumer.jar. Pokud je u vašeho clusteru povolený balíček zabezpečení podniku (ESP), použijte kafka-producer-consumer-esp.jar. Spuštěním kroku 3 zkopírujte soubor JAR do clusteru HDInsight.

  1. Stáhněte a extrahujte příklady z https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.

  2. Nastavte aktuální adresář na umístění hdinsight-kafka-java-get-started\Producer-Consumer adresáře. Pokud používáte cluster Kafka s podporou enterprise security package (ESP), měli byste umístění nastavit na DomainJoined-Producer-Consumerpodadresář. K sestavení aplikace použijte následující příkaz:

    mvn clean package
    

    Tento příkaz vytvoří adresář s názvem target, který bude obsahovat soubor s názvem kafka-producer-consumer-1.0-SNAPSHOT.jar. V případě clusterů ESP bude soubor kafka-producer-consumer-esp-1.0-SNAPSHOT.jar

  3. Místo sshuser použijte jméno uživatele SSH pro váš cluster a místo CLUSTERNAME zadejte název clusteru. Zadáním následujícího příkazu zkopírujte soubor do clusteru kafka-producer-consumer-1.0-SNAPSHOT.jar HDInsight. Po zobrazení výzvy zadejte heslo uživatele SSH.

    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

Spuštění ukázky

  1. Místo sshuser použijte jméno uživatele SSH pro váš cluster a místo CLUSTERNAME zadejte název clusteru. Zadáním následujícího příkazu otevřete připojení SSH ke clusteru. Pokud se zobrazí výzva, zadejte heslo uživatelského účtu SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Pokud chcete získat hostitele zprostředkovatele Kafka, v následujícím příkazu nahraďte hodnoty a <clustername><password> a spusťte je. Pro použijte stejná písmena<clustername>, jako je znázorněno na Azure Portal. Nahraďte <password> přihlašovacím heslem clusteru a pak spusťte:

    sudo apt -y install jq
    export CLUSTER_NAME='<clustername>'
    export PASSWORD='<password>'
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Poznámka

    Tento příkaz vyžaduje přístup Ambari. Pokud je váš cluster za skupinou zabezpečení sítě, spusťte tento příkaz z počítače, který má přístup k Ambari.

  3. Zadáním následujícího příkazu vytvořte téma myTestKafka:

    java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
    
  4. Pokud chcete spustit producenta a zapsat data do tématu, použijte následující příkaz:

    java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
    
  5. Jakmile bude producent hotový, pomocí následujícího příkazu zahajte čtení z tématu:

    java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS
    scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

    Zobrazí se počet načtených záznamů spolu s celkovým počtem.

  6. Konzumenta ukončíte stisknutím Ctrl+C.

Víc současných konzumentů

Konzumenti Kafka při čtení záznamů používají skupiny konzumentů. Výsledkem použití skupiny s více konzumenty je vyvážení zatížení při čtení záznamů z tématu. Každý konzument ze skupiny obdrží určitou část záznamů.

Aplikace konzumenta přijímá parametr, který se použije jako ID skupiny. Například následující příkaz spustí konzumenta s použitím ID skupiny myGroup:

java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup

Konzumenta ukončíte stisknutím Ctrl+C.

Pokud chcete vidět tento proces v akci, použijte následující příkaz:

tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach

Tento příkaz pomocí tmux rozdělí terminál do dvou sloupců. V obou sloupcích je spuštěný konzument se stejnou hodnotou ID skupiny. Jakmile konzumenti dokončí čtení, všimněte si, že oba přečetli pouze část záznamů. Stisknutím kombinace kláves Ctrl+C ukončete tmux.

Konzumace klienty ze stejné skupiny se realizuje rozdělením tématu na oddíly. V tomto vzorovém kódu má dříve vytvořené téma test osm oddílů. Pokud spustíte osm konzumentů, každý z nich bude číst záznamy z jednoho oddílu tématu.

Důležité

Ve skupině příjemců nemůže být víc instancí konzumentů než má téma oddílů. V tomto příkladu může skupina konzumentů obsahovat až osm konzumentů, protože to je počet oddílů tématu. Nebo můžete mít více skupin konzumentů, každou s maximálně osmi konzumenty.

Záznamy uložené v systému Kafka se ukládají v pořadí, v jakém jsou přijaty v rámci oddílu. Pro dosažení doručování záznamů ve správném pořadí v rámci oddílu vytvořte skupinu příjemců, ve které bude počet instancí konzumentů odpovídat počtu oddílů. Pro dosažení doručování záznamů ve správném pořadí v rámci tématu vytvořte skupinu obsahující pouze jednu instanci konzumenta.

Běžné problémy

  1. Selhání vytváření tématu Pokud je v clusteru povolená sada Zabezpečení podniku, použijte předem připravené soubory JAR pro producenta a příjemce. Soubor JAR ESP lze sestavit z kódu v DomainJoined-Producer-Consumer podadresáři. Vlastnosti producenta a příjemce mají pro clustery s podporou ESP další vlastnost CommonClientConfigs.SECURITY_PROTOCOL_CONFIG .

  2. Selhání v clusterech s podporou ESP: Pokud operace vytváření a využívání selžou a používáte cluster s podporou ESP, zkontrolujte, jestli je uživatel kafka přítomný ve všech zásadách Ranger. Pokud není k dispozici, přidejte ji do všech zásad Ranger.

Vyčištění prostředků

Pokud chcete vyčistit prostředky vytvořené v tomto kurzu, můžete odstranit skupinu prostředků. Odstraněním skupiny prostředků odstraníte také přidružený cluster HDInsight a všechny další prostředky, které jsou k příslušné skupině prostředků přidružené.

Odebrání skupiny prostředků pomocí webu Azure Portal:

  1. Na webu Azure Portal rozbalením nabídky na levé straně otevřete nabídku služeb a pak zvolte Skupiny prostředků. Zobrazí se seznam skupin prostředků.
  2. Vyhledejte skupinu prostředků, kterou chcete odstranit, a klikněte pravým tlačítkem na tlačítko Další (...) na pravé straně seznamu.
  3. Vyberte Odstranit skupinu prostředků a potvrďte tuto akci.

Další kroky

V tomto dokumentu jste zjistili, jak používat rozhraní Apache Kafka Producer and Consumer API se systémem Kafka ve službě HDInsight. Další informace o práci s platformou Kafka najdete v těchto zdrojích: