Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Meer informatie over het gebruik van de Producer- en Consumer-API's van Apache Kafka met Kafka in HDInsight.
Met de Producer-API van Kafka kunnen toepassingen gegevensstromen verzenden naar het Kafka-cluster. Met de Kafka Consumer-API kunnen toepassingen stromen met gegevens uit het cluster lezen.
In deze zelfstudie leert u het volgende:
- Vereiste voorwaarden
- De code begrijpen
- De toepassing compileren en implementeren
- De toepassing uitvoeren op het cluster
Zie de Apache-documentatie over de Producer-API- en Consumer-APIvoor meer informatie over de API's.
Vereiste voorwaarden
- Apache Kafka in HDInsight-cluster. Zie Aan de slag met Apache Kafka in HDInsightvoor meer informatie over het maken van het cluster.
- Java Developer Kit (JDK) versie 8 of een equivalent, zoals OpenJDK.
- Apache Maven correct geïnstalleerd volgens Apache. Maven is een systeem voor het bouwen van Java-projecten.
- Een SSH-client zoals Putty. Zie voor meer informatie Verbinding maken met HDInsight (Apache Hadoop) via SSH.
De code begrijpen
De voorbeeldtoepassing bevindt zich in https://github.com/Azure-Samples/hdinsight-kafka-java-get-startedin de submap Producer-Consumer
. Als u een Kafka-cluster met ingeschakeld Enterprise Security Package (ESP) gebruikt, moet u de versie van de toepassing in submap DomainJoined-Producer-Consumer
gebruiken.
De toepassing bestaat voornamelijk uit vier bestanden:
-
pom.xml
: dit bestand definieert de projectafhankelijkheden, java-versie en verpakkingsmethoden. -
Producer.java
: dit bestand verzendt willekeurige zinnen naar Kafka met behulp van de producer-API. -
Consumer.java
: in dit bestand wordt de consumenten-API gebruikt om gegevens uit Kafka te lezen en naar STDOUT te verzenden. -
AdminClientWrapper.java
: dit bestand maakt gebruik van de beheer-API om Kafka-onderwerpen te maken, te beschrijven en te verwijderen. -
Run.java
: de opdrachtregelinterface die wordt gebruikt om de producent- en consumentencode uit te voeren.
Pom.xml
De belangrijkste dingen die u moet begrijpen in het bestand pom.xml
zijn:
Afhankelijkheden: dit project is afhankelijk van de Kafka-producent- en consumenten-API's, die worden geleverd door het
kafka-clients
-pakket. De volgende XML-code definieert deze afhankelijkheid:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
De
${kafka.version}
vermelding wordt gedeclareerd in de sectie<properties>..</properties>
vanpom.xml
en is geconfigureerd voor de Kafka-versie van het HDInsight-cluster.Invoegtoepassingen: Maven-invoegtoepassingen bieden verschillende mogelijkheden. In dit project worden de volgende invoegtoepassingen gebruikt:
-
maven-compiler-plugin
: wordt gebruikt om de Java-versie in te stellen die door het project wordt gebruikt op 8. Dit is de versie van Java die wordt gebruikt door HDInsight 3.6. -
maven-shade-plugin
: wordt gebruikt voor het genereren van een uber-jar die deze toepassing en eventuele afhankelijkheden bevat. Het wordt ook gebruikt om het toegangspunt van de toepassing in te stellen, zodat u het Jar-bestand rechtstreeks kunt uitvoeren zonder dat u de hoofdklasse hoeft op te geven.
-
Producer.java
De producent communiceert met de Kafka-brokerhosts (werkknooppunten) en verzendt gegevens naar een Kafka-onderwerp. Het volgende codefragment is afkomstig uit het Producer.java-bestand uit de GitHub-opslagplaats en laat zien hoe u de producereigenschappen instelt. Voor clusters met enterprisebeveiliging moet een extra eigenschap worden toegevoegd:properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
De consument communiceert met de Kafka-brokerhosts (werkknooppunten) en leest records in een lus. Met het volgende codefragment uit het bestand Consumer.java worden de eigenschappen van de consument ingesteld. Voor clusters met enterprisebeveiliging moet een extra eigenschap worden toegevoegd:properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
In deze code is de consument geconfigureerd om te lezen vanaf het begin van het onderwerp (auto.offset.reset
is ingesteld op earliest
.)
Run.java
Het Run.java-bestand biedt een opdrachtregelinterface waarmee de producent- of consumentencode wordt uitgevoerd. U moet de kafka-brokerhostgegevens opgeven als parameter. U kunt eventueel een groeps-id-waarde opnemen, die wordt gebruikt door het consumentenproces. Als u meerdere consumentenexemplaren maakt met dezelfde groep-ID, verdelen ze de leeslast van het onderwerp.
Het voorbeeld bouwen en implementeren
Vooraf gebouwde JAR-bestanden gebruiken
Download de JAR-bestanden uit het Azure Get Started-voorbeeld. Als uw cluster is Enterprise Security Package (ESP) ingeschakeld, gebruikt u kafka-producer-consumer-esp.jar. Gebruik de onderstaande opdracht om de JAR's naar uw cluster te kopiëren.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
De JAR-bestanden bouwen op basis van code
Als u deze stap wilt overslaan, kunnen vooraf gemaakte JAR's worden gedownload uit de submap Prebuilt-Jars
. Download de Kafka-Producer-Consumer.jar. Als uw cluster Enterprise Security Package (ESP) ingeschakeld is, gebruik kafka-producer-consumer-esp.jar. Voer stap 3 uit om het JAR-bestand naar uw HDInsight-cluster te kopiëren.
Download en pak de voorbeelden uit https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.
Stel uw huidige map in op de locatie van de
hdinsight-kafka-java-get-started\Producer-Consumer
map. Als u Enterprise Security Package (ESP) ingeschakeld Kafka-cluster gebruikt, moet u de locatie instellen opDomainJoined-Producer-Consumer
submap. Gebruik de volgende opdracht om de toepassing te bouwen:mvn clean package
Met deze opdracht maakt u een map met de naam
target
, die een bestand met de naamkafka-producer-consumer-1.0-SNAPSHOT.jar
bevat. Voor ESP-clusters wordt het bestandkafka-producer-consumer-esp-1.0-SNAPSHOT.jar
Vervang
sshuser
door de SSH-gebruiker voor uw cluster en vervangCLUSTERNAME
door de naam van uw cluster. Voer de volgende opdracht in om hetkafka-producer-consumer-1.0-SNAPSHOT.jar
bestand naar uw HDInsight-cluster te kopiëren. Wanneer u hierom wordt gevraagd, voert u het wachtwoord voor de SSH-gebruiker in.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Het voorbeeld uitvoeren
Vervang
sshuser
door de SSH-gebruiker voor uw cluster en vervangCLUSTERNAME
door de naam van uw cluster. Open een SSH-verbinding met het cluster door de volgende opdracht in te voeren. Voer het wachtwoord voor het SSH-gebruikersaccount in als u hierom wordt gevraagd.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Als u de Kafka-brokerhosts wilt ophalen, vervangt u de waarden voor
<clustername>
en<password>
in de volgende opdracht en voert u deze uit. Gebruik dezelfde behuizing voor<clustername>
zoals wordt weergegeven in Azure Portal. Vervang<password>
door het aanmeldingswachtwoord voor het cluster en voer het volgende uit:sudo apt -y install jq export CLUSTER_NAME='<clustername>' export PASSWORD='<password>' export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Notitie
Voor deze opdracht is toegang tot Ambari vereist. Als uw cluster zich achter een NSG bevindt, voert u deze opdracht uit vanaf een computer die toegang heeft tot Ambari.
Maak het Kafka-onderwerp
myTest
door het commando als volgt uit te voeren:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
Gebruik de volgende opdracht om de producer uit te voeren en gegevens naar het topic te schrijven:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
Zodra de producent klaar is, gebruikt u de volgende opdracht om het onderwerp te lezen:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
De gelezen records, samen met het aantal records, worden weergegeven.
Gebruik Ctrl + C om de consumententoepassing te beëindigen.
Meerdere consumenten
Kafka-consumenten gebruiken een consumentengroep bij het lezen van records. Het gebruik van dezelfde groep met meerdere consumenten resulteert in leesbewerkingen van een onderwerp met gelijke taakverdeling. Elke consument in de groep ontvangt een deel van de records.
De consumententoepassing accepteert een parameter die wordt gebruikt als de groeps-id. Met de volgende opdracht wordt bijvoorbeeld een consument gestart met behulp van een groeps-id van myGroup
:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Gebruik Ctrl + C om de client af te sluiten.
Gebruik de volgende opdracht om dit proces in actie te zien:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Met deze opdracht wordt tmux
gebruikt om de terminal in twee kolommen te splitsen. Een consument wordt in elke kolom gestart, met dezelfde groep-ID-waarde. Zodra de gebruikers klaar zijn met lezen, ziet u dat elk slechts een deel van de records heeft gelezen. Gebruik Ctrl + C tweemaal om tmux
af te sluiten.
Verbruik door clients binnen dezelfde groep wordt verwerkt via de partities voor het onderwerp. In dit codevoorbeeld heeft de eerder gemaakte test
-topic acht partities. Als u acht consumenten start, leest elke consument records uit één partitie voor het onderwerp.
Belangrijk
Er kunnen niet meer consumenteninstanties in een consumentengroep bestaan dan partities. In dit voorbeeld kan één consumentengroep maximaal acht consumenten bevatten, omdat dat het aantal partities in het onderwerp is. U kunt ook meerdere consumentengroepen hebben, elk met niet meer dan acht consumenten.
Records die zijn opgeslagen in Kafka, worden opgeslagen in de volgorde waarin ze binnen een partitie worden ontvangen. Als u in-geordende levering wilt bereiken voor records binnen een partitie, maakt u een consumentengroep waarin het aantal consumentenexemplaren overeenkomt met het aantal partities. Om in-geordende levering te bereiken voor records binnen de topic, maakt u een consumentengroep met slechts één consumentenexemplaar.
Veelvoorkomende problemen
onderwerp maken mislukt Als uw cluster Enterprise Security Pack is ingeschakeld, gebruikt u de vooraf gemaakte JAR-bestanden voor producenten en consumenten. De ESP-jar kan worden gebouwd op basis van de code in de
DomainJoined-Producer-Consumer
submap. De eigenschappen van de producent en consument hebben een extra eigenschapCommonClientConfigs.SECURITY_PROTOCOL_CONFIG
voor clusters waarvoor ESP is ingeschakeld.fout in clusters met ESP-functionaliteit: als de productie- en verbruikbewerkingen mislukken en u een ESP-cluster gebruikt, controleert u of de gebruiker
kafka
aanwezig is in alle Ranger-beleidsregels. Als deze niet aanwezig is, voegt u deze toe aan alle Ranger-beleidsregels.
Hulpmiddelen opruimen
Als u de resources wilt opschonen die in deze handleiding zijn gemaakt, kunt u de resourcegroep verwijderen. Als u de resourcegroep verwijdert, worden ook het bijbehorende HDInsight-cluster en eventuele andere resources die aan de resourcegroep zijn gekoppeld, verwijderd.
Ga als volgt te werk om de resourcegroep te verwijderen in Azure Portal:
- Vouw het menu aan de linkerkant in Azure Portal uit om het menu met services te openen en kies Resourcegroepen om de lijst met resourcegroepen weer te geven.
- Zoek de resourcegroep die u wilt verwijderen en klik met de rechtermuisknop op de knop Meer (... ) aan de rechterkant van de vermelding.
- Selecteer Resourcegroep verwijderen en bevestig dit.
Volgende stappen
In dit document hebt u geleerd hoe u de Producer- en Consumer-API van Apache Kafka gebruikt met Kafka in HDInsight. Gebruik de volgende informatie voor meer informatie over het werken met Kafka: