Samouczek: korzystanie z interfejsów API producentów i odbiorców platformy Apache Kafka
Informacje o sposobie korzystania z interfejsów API producentów i odbiorców platformy Apache Kafka w usłudze HDInsight.
Interfejs API producenta platformy Kafka umożliwia aplikacjom wysyłanie strumieni danych do klastra Kafka. Interfejs API odbiorcy platformy Kafka umożliwia aplikacjom odczytywanie strumieni danych z klastra.
Ten samouczek zawiera informacje na temat wykonywania następujących czynności:
- Wymagania wstępne
- Zrozumienie kodu
- Kompilowanie i wdrażanie aplikacji
- Uruchamianie aplikacji w klastrze
Aby uzyskać więcej informacji o tych interfejsach API, zobacz dokumentację platformy Apache dotyczącą interfejsu API producenta i interfejsu API odbiorcy.
Wymagania wstępne
- Platforma Apache Kafka w klastrze usługi HDInsight. Aby dowiedzieć się, jak utworzyć klaster, zobacz Rozpoczynanie pracy z platformą Apache Kafka w usłudze HDInsight.
- Zestaw Java Developer Kit (JDK) w wersji 8 lub odpowiednik, taki jak OpenJDK.
- Narzędzie Apache Maven prawidłowo zainstalowane zgodnie z apache. Maven to system kompilacji projektu dla projektów Java.
- Klient SSH, taki jak Putty. Aby uzyskać więcej informacji, zobacz Łączenie się z usługą HDInsight (Apache Hadoop) przy użyciu protokołu SSH.
Zrozumienie kodu
Przykładowa aplikacja znajduje się pod adresem https://github.com/Azure-Samples/hdinsight-kafka-java-get-started w podkatalogu Producer-Consumer
. Jeśli używasz klastra kafka z włączonym pakietem Enterprise Security (ESP ), należy użyć wersji aplikacji znajdującej się w podkatalogu DomainJoined-Producer-Consumer
.
Ta aplikacja składa się zasadniczo z czterech plików:
-
pom.xml
: w tym pliku są definiowane zależności projektu, wersja języka Java i metody pakowania. -
Producer.java
: ten plik wysyła losowe zdania do platformy Kafka przy użyciu interfejsu API producenta. -
Consumer.java
: ten plik korzysta z interfejsu API odbiorcy do odczytywania danych z platformy Kafka i przekazywania ich do wyjścia STDOUT. -
AdminClientWrapper.java
: ten plik używa interfejsu API administratora do tworzenia, opisywania i usuwania tematów platformy Kafka. -
Run.java
: interfejs wiersza polecenia używany do uruchamiania kodu producenta i odbiorcy.
Pom.xml
Należy zrozumieć następujące ważne kwestie dotyczące pliku pom.xml
:
Zależności: ten projekt bazuje na interfejsach API producenta i odbiorcy platformy Kafka, które są udostępniane w pakiecie
kafka-clients
. Ta zależność jest definiowana przez następujący kod XML:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
Wpis
${kafka.version}
jest zadeklarowany w sekcji<properties>..</properties>
plikupom.xml
i jest skonfigurowany zgodnie z wersją platformy Kafka znajdującą się w klastrze usługi HDInsight.Wtyczki: wtyczki Maven zapewniają różne możliwości. W tym projekcie są używane następujące wtyczki:
-
maven-compiler-plugin
: służy do ustawiania wersji 8 języka Java używanej przez projekt. Jest to wersja języka Java używana przez usługę HDInsight 3.6. -
maven-shade-plugin
: służy do generowania pełnego pliku jar zawierającego tę aplikację, a także wszelkie zależności. Jest ona również używana do ustawiania punktu wejścia aplikacji, dzięki czemu można bezpośrednio uruchamiać plik Jar bez konieczności określania klasy głównej.
-
Producer.java
Producent komunikuje się z hostami brokera platformy Kafka (węzłami procesu roboczego) i wysyła dane do tematu platformy Kafka. Poniższy fragment kodu pochodzi z pliku Producer.java z repozytorium GitHub i pokazuje, jak ustawić właściwości producenta. W przypadku klastrów z obsługą zabezpieczeń przedsiębiorstwa należy dodać dodatkową właściwość "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
Odbiorca komunikuje się z hostami brokera platformy Kafka (węzłami procesu roboczego) i odczytuje rekordy w pętli. Poniższy fragment kodu z pliku Consumer.java ustawia właściwości odbiorcy. W przypadku klastrów z obsługą zabezpieczeń przedsiębiorstwa należy dodać dodatkową właściwość "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
W tym kodzie odbiorca jest skonfigurowany do odczytywania od początku tematu (właściwość auto.offset.reset
jest ustawiona na wartość earliest
).
Run.java
Plik Run.java udostępnia interfejs wiersza polecenia, który uruchamia kod producenta lub odbiorcy. Jako parametr należy podać informacje o hoście brokera platformy Kafka. Opcjonalnie możesz uwzględnić wartość identyfikatora grupy, która jest używana przez proces odbiorcy. Jeśli utworzysz wiele wystąpień konsumentów przy użyciu tego samego identyfikatora grupy, będą one równoważyć obciążenie odczytu z tematu.
Kompilowanie i wdrażanie przykładu
Używanie wstępnie utworzonych plików JAR
Pobierz pliki jar z przykładu platformy Azure Wprowadzenie do platformy Kafka. Jeśli klaster jest włączony pakiet Enterprise Security (ESP), użyj pliku kafka-producer-consumer-esp.jar. Użyj poniższego polecenia, aby skopiować pliki jar do klastra.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Kompilowanie plików JAR na podstawie kodu
Jeśli chcesz pominąć ten krok, wstępnie utworzone pliki jar można pobrać z podkatalogu Prebuilt-Jars
. Pobierz plik kafka-producer-consumer.jar. Jeśli klaster jest włączony pakiet Enterprise Security (ESP), użyj pliku kafka-producer-consumer-esp.jar. Wykonaj krok 3, aby skopiować plik jar do klastra usługi HDInsight.
Pobierz i wyodrębnij przykłady z pliku https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.
Ustaw bieżący katalog na lokalizację
hdinsight-kafka-java-get-started\Producer-Consumer
katalogu. Jeśli używasz klastra kafka z włączonym pakietem Enterprise Security (ESP ), ustaw lokalizację naDomainJoined-Producer-Consumer
podkatalog. Użyj następującego polecenia, aby skompilować aplikację:mvn clean package
To polecenie tworzy katalog o nazwie
target
, który zawiera plik o nazwiekafka-producer-consumer-1.0-SNAPSHOT.jar
. W przypadku klastrów ESP plik będziekafka-producer-consumer-esp-1.0-SNAPSHOT.jar
Zamień ciąg
sshuser
na nazwę użytkownika SSH klastra i zamień ciągCLUSTERNAME
na nazwę klastra. Wprowadź następujące polecenie, aby skopiować plik do klastrakafka-producer-consumer-1.0-SNAPSHOT.jar
usługi HDInsight. Po wyświetleniu monitu wprowadź hasło użytkownika SSH.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Uruchamianie przykładu
Zamień ciąg
sshuser
na nazwę użytkownika SSH klastra i zamień ciągCLUSTERNAME
na nazwę klastra. Otwórz połączenie SSH z klastrem, wprowadzając następujące polecenie. Jeśli zostanie wyświetlony monit, wprowadź hasło konta użytkownika SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Aby uzyskać hosty brokera platformy Kafka, zastąp wartości i
<clustername>
<password>
w poniższym poleceniu i wykonaj je. Użyj tej samej wielkości liter,<clustername>
jak pokazano w Azure Portal. Zastąp<password>
element hasłem logowania klastra, a następnie wykonaj następujące polecenie:sudo apt -y install jq export CLUSTER_NAME='<clustername>' export PASSWORD='<password>' export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Uwaga
To polecenie wymaga dostępu ambari. Jeśli klaster znajduje się za sieciową grupą zabezpieczeń, uruchom to polecenie z maszyny, która może uzyskać dostęp do systemu Ambari.
Utwórz temat platformy Kafka,
myTest
, wprowadzając następujące polecenie:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
Aby uruchomić producenta i zapisać dane w temacie, użyj następującego polecenia:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
Po zakończeniu procesu producenta odczytaj rekordy z tematu za pomocą następującego polecenia:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Zostanie wyświetlona liczba odczytanych rekordów wraz z liczbą rekordów.
Użyj klawiszy Ctrl+C, aby zakończyć działanie odbiorcy.
Wielu odbiorców
Odbiorcy platformy Kafka używają grupy odbiorców podczas odczytywania rekordów. Korzystanie z tej samej grupy przez wielu odbiorców umożliwia równoważenie obciążenia podczas przeprowadzania odczytu z tematu. Każdy odbiorca w grupie odbiera część rekordów.
Aplikacja odbiorcy akceptuje parametr, który jest używany jako identyfikator grupy. Na przykład następujące polecenie uruchamia odbiorcę przy użyciu identyfikatora grupy myGroup
:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Użyj klawiszy Ctrl+C, aby zakończyć działanie odbiorcy.
Aby zobaczyć, jak działa ten proces, użyj następującego polecenia:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
To polecenie używa polecenia tmux
, aby podzielić terminal na dwie kolumny. W każdej kolumnie jest uruchamiany odbiorca z tą samą wartością identyfikatora grupy. Kiedy odbiorcy zakończą odczytywanie, można zauważyć, że każdy z nich odczytał tylko część rekordów. Użyj dwukrotnie klawiszy Ctrl + C , aby zakończyć działanie tmux
polecenia .
Użycie przez klientów w tej samej grupie jest obsługiwane przez partycje tematu. W tym przykładowym kodzie utworzony wcześniej temat test
ma osiem partycji. Jeśli zostanie uruchomionych ośmiu odbiorców, każdy z nich będzie odczytywał rekordy z jednej partycji tematu.
Ważne
Grupa odbiorców nie może zawierać więcej wystąpień odbiorców niż partycji. W tym przykładzie jedna grupa odbiorców może zawierać maksymalnie ośmiu odbiorców, ponieważ tyle partycji znajduje się w temacie. Może też istnieć wiele grup odbiorców — każda z nich może zawierać maksymalnie ośmiu odbiorców.
Rekordy przechowywane na platformie Kafka są przechowywane w kolejności, w której są odbierane w ramach partycji. Aby dostarczać rekordy na partycji w określonej kolejności, utwórz grupę odbiorców, w której liczba wystąpień odbiorców jest zgodna z liczbą partycji. Aby dostarczać rekordy w temacie w określonej kolejności, utwórz grupę odbiorców z jednym wystąpieniem odbiorcy.
Typowe problemy, z którymi się borykają
Tworzenie tematu kończy się niepowodzeniem Jeśli klaster jest włączony pakiet zabezpieczeń przedsiębiorstwa, użyj wstępnie utworzonych plików JAR dla producenta i odbiorcy. Plik JAR esp można skompilować na podstawie kodu w podkatalogu
DomainJoined-Producer-Consumer
. Właściwości producenta i odbiorcy mają dodatkową właściwośćCommonClientConfigs.SECURITY_PROTOCOL_CONFIG
dla klastrów obsługujących protokół ESP.Niepowodzenie w klastrach z włączoną obsługą ze stanem rejestracji: jeśli operacje tworzenia i korzystania z nich kończą się niepowodzeniem i używasz klastra z włączoną obsługą ze stanem rejestracji, sprawdź, czy użytkownik
kafka
jest obecny we wszystkich zasadach platformy Ranger. Jeśli nie jest obecny, dodaj go do wszystkich zasad platformy Ranger.
Czyszczenie zasobów
Aby wyczyścić zasoby utworzone w tym samouczku, możesz usunąć grupę zasobów. Usunięcie grupy zasobów powoduje również usunięcie skojarzonego klastra usługi HDInsight i wszystkich innych zasobów skojarzonych z tą grupą zasobów.
Aby usunąć grupę zasobów za pomocą witryny Azure Portal:
- W witrynie Azure Portal rozwiń menu po lewej stronie, aby otworzyć menu usług, a następnie wybierz pozycję Grupy zasobów, aby wyświetlić listę grup zasobów.
- Znajdź grupę zasobów do usunięcia, a następnie kliknij prawym przyciskiem myszy przycisk Więcej (...) po prawej stronie listy.
- Wybierz pozycję Usuń grupę zasobów i potwierdź.
Następne kroki
W tym dokumencie zawarto informacje o sposobie korzystania z interfejsu API producenta i odbiorcy platformy Apache Kafka w usłudze HDInsight. Dowiedz się więcej o pracy z platformą Kafka, korzystając z następujących zasobów: