教學課程:使用 Apache Kafka Producer 和 Consumer API
在此文章
必要條件
了解程式碼
建置並部署範例
執行範例
常見的問題
清除資源
下一步
顯示其他 3 個
了解如何搭配 HDInsight 上的 Kafka 使用 Apache Kafka Producer 和 Consumer API。
Kafka Producer API 可讓應用程式將資料流傳送至 Kafka 叢集。 Kafka Consumer API 可讓應用程式從叢集讀取資料流。
在本教學課程中,您會了解如何:
必要條件
了解程式碼
組建和部署應用程式
在叢集上執行應用程式
如需 API 的詳細資訊,請參閱 Producer API 和 Consumer API 的 Apache 說明文件。
範例應用程式位於 https://github.com/Azure-Samples/hdinsight-kafka-java-get-started 的 Producer-Consumer
子目錄中。 如果您使用 企業安全性套件 (ESP) 啟用的 Kafka 叢集,您應該使用位於 DomainJoined-Producer-Consumer
子目錄中的應用程式版本。
該應用程式主要包含四個檔案:
pom.xml
:此檔案會定義專案相依性、Java 版本和封裝方法。
Producer.java
:此檔案會使用 Producer API 將隨機句子傳送至 Kafka。
Consumer.java
:此檔案會使用 Consumer API 從 Kafka 讀取資料並將其發送至 STDOUT。
AdminClientWrapper.java
:此檔案會使用管理員 API 來建立、描述和刪除 Kafka 主題。
Run.java
:用來執行產生者和取用者程式碼的命令列介面。
在 pom.xml
檔案中務必要了解的事項包括:
相依性:此專案依存於 kafka-clients
套件所提供的 Producer 和 Consumer API。 下列 XML 程式碼會定義此相依性:
<dependency >
<groupId > org.apache.kafka</groupId >
<artifactId > kafka-clients</artifactId >
<version > ${kafka.version}</version >
</dependency >
${kafka.version}
項目會在 pom.xml
的 <properties>..</properties>
區段中進行宣告,並設定為 HDInsight 叢集的 Kafka 版本。
外掛程式:Maven 外掛程式可提供多種功能。 在此專案中,會使用下列外掛程式:
maven-compiler-plugin
:用來將專案所使用的 Java 版本設為 8。 這是HDInsight 3.6 所使用的 Java 版本。
maven-shade-plugin
:用來產生包含此應用程式以及任何相依性的 uber jar。 它也可用來設定應用程式的進入點,如此您即可直接執行 Jar 檔案,而不需要指定主要類別。
產生者會與 Kafka 訊息代理程式主機 (背景工作節點) 通訊,並將資料傳送至 Kafka 主題。 下列程式碼片段取自 GitHub 存放庫 中的 Producer.java 檔案,可說明如何設定產生者屬性。 針對已啟用企業安全性的叢集,必須新增額外的屬性 "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
Properties properties = new Properties();
properties.setProperty("bootstrap.servers" , brokers);
properties.setProperty("key.serializer" ,"org.apache.kafka.common.serialization.StringSerializer" );
properties.setProperty("value.serializer" ,"org.apache.kafka.common.serialization.StringSerializer" );
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
取用者會與 Kafka 代理程式主機 (背景工作節點) 通訊,以迴圈方式讀取記錄。 來自於 Consumer.java 檔案的下列程式碼片段會設定取用者屬性。 針對已啟用企業安全性的叢集,必須新增額外的屬性 "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
KafkaConsumer<String, String> consumer;
Properties properties = new Properties();
properties.setProperty("bootstrap.servers" , brokers);
properties.setProperty("group.id" , groupId);
properties.setProperty("key.deserializer" ,"org.apache.kafka.common.serialization.StringDeserializer" );
properties.setProperty("value.deserializer" ,"org.apache.kafka.common.serialization.StringDeserializer" );
properties.setProperty("auto.offset.reset" ,"earliest" );
consumer = new KafkaConsumer<>(properties);
在此程式碼中,取用者會設定為從主題的開頭處讀取 (auto.offset.reset
設為 earliest
)。
Run.java 檔案會提供可執行產生者或取用者程式碼的命令列介面。 您必須提供 Kafka 代理程式主機資訊作為參數。 您可以選擇性地包含取用者程序所使用的群組識別碼值。 如果您使用相同的群組識別碼建立多個取用者執行個體,這些執行個體將會以負載平衡的方式讀取主題。
從 Kafka 開始使用 Azure 範例 下載 JAR。 如果您的叢集已啟用企業安全性套件 (ESP) ,請使用 kafka-producer-consumer-esp.jar。 使用下列命令將 JAR 複製到您的叢集。
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net :kafka-producer-consumer.jar
如果您想要略過此步驟,可從 Prebuilt-Jars
子目錄下載預先建立的 jar。 下載 kafka-producer-consumer.jar。 如果您的叢集已啟用企業安全性套件 (ESP) ,請使用 kafka-producer-consumer-esp.jar。 執行步驟 3 將 jar 複製到您的 HDInsight 叢集。
從 https://github.com/Azure-Samples/hdinsight-kafka-java-get-started 下載並解壓縮範例。
將目前目錄設定為 hdinsight-kafka-java-get-started\Producer-Consumer
目錄的位置。 如果您使用 企業安全性套件 (ESP) 啟用的 Kafka 叢集,您應該將位置設定為 DomainJoined-Producer-Consumer
子目錄。 使用下列命令建置應用程式:
mvn clean package
此命令會建立名為 target
的目錄,其中包含名為 kafka-producer-consumer-1.0-SNAPSHOT.jar
的檔案。 若為 ESP 叢集,檔案將會是 kafka-producer-consumer-esp-1.0-SNAPSHOT.jar
將 sshuser
取代為叢集的 SSH 使用者,並將 CLUSTERNAME
取代為叢集的名稱。 輸入下列命令,將 kafka-producer-consumer-1.0-SNAPSHOT.jar
檔案複製到 HDInsight 叢集。 出現提示時,請輸入 SSH 使用者的密碼。
scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net :kafka-producer-consumer.jar
將 sshuser
取代為叢集的 SSH 使用者,並將 CLUSTERNAME
取代為叢集的名稱。 輸入下列命令,開啟與叢集的 SSH 連線。 出現提示時,請輸入 SSH 使用者帳戶的密碼。
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
若要取得 Kafka 代理程式主機,請取代下列命令中的 <clustername>
和 <password>
值並加以執行。 <clustername>
需使用相同的大小寫,如 Azure 入口網站所示。 請將 <password>
取代為叢集登入密碼,然後執行:
sudo apt -y install jq
export CLUSTER_NAME='<clustername>'
export PASSWORD='<password>'
export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME .azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME /services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
備註
此命令需要 Ambari 存取權。 如果您的叢集位於 NSG 後方,請從可存取 Ambari 的機器執行此命令。
輸入下列命令,建立 Kafka 主題 myTest
:
java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
若要執行產生器,並且將資料寫入至主題,請使用下列命令:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
產生器完成後,請使用下列命令從主題讀取︰
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS
scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
已讀取的記錄以及記錄計數隨即顯示。
使用 Ctrl + C 來結束取用者。
Kafka 取用者會在讀取記錄時使用取用者群組。 多個取用者使用相同群組會導致從主題讀取負載平衡。 群組中的每個取用者都會收到一部分的記錄。
取用者應用程式接受作為群組識別碼的參數。 例如,下列命令會使用 myGroup
的群組識別碼啟動取用者:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
使用 Ctrl + C 來結束取用者。
若要查看此程序的運作情況,請使用下列命令︰
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
此命令會使用 tmux
將終端機分割成兩個資料行。 取用者會在每個資料行中啟動 (使用相同的群組識別碼值)。 取用者完成讀取後,請留意到每個取用者僅讀取了記錄的一部分。 請使用 Ctrl + C 兩次來結束 tmux
。
透過主題的資料分割處理相同群組內的用戶端取用。 在此程式碼範例中,稍早建立的 test
主題有八個分割區。 如果您啟動八個取用者,則每個取用者都會從主題的單一分割區讀取記錄。
重要
一個取用者群組中的取用者執行個體不得超過資料分割。 在此範例中,一個取用者群組可以包含最多 8 個取用者,因為這是主題中的資料分割數目。 或者,您可以有多個取用者群組,其各有不超過 8 個取用者。
Kafka 中儲存的記錄會依照其在資料分割內接收的順序儲存。 若要達到依序傳遞「資料分割內」 的記錄,請建立取用者群組,其中的取用者執行個體數目與資料分割數目相符。 若要達到依序傳遞「主題內」 的記錄,請建立只有一個取用者執行個體的取用者群組。
主題建立失敗 如果您的叢集已啟用企業安全性套件,請使用適用於生產者和取用者的預建 JAR 檔案 。 您可以從DomainJoined-Producer-Consumer
子目錄 中的程式碼建置 ESP jar。 生產者和取用者屬性都有已啟用 ESP 的叢集所適用的額外屬性 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG
。
啟用 ESP 的叢集失敗 :如果產生和取用作業失敗,而且您使用已啟用 ESP 的叢集,請檢查使用者 kafka
是否存在於所有 Ranger 原則中。 如果不存在,請將其新增至所有 Ranger 原則。
若要清除本教學課程所建立的資源,您可以刪除資源群組。 刪除資源群組也會刪除相關聯的 HDInsight 叢集,以及與資源群組相關聯的任何其他資源。
若要使用 Azure 入口網站移除資源群組:
在 Azure 入口網站中展開左側功能表,以開啟服務的功能表,然後選擇 [資源群組] 以顯示資源群組的清單。
找出要刪除的資源群組,然後以滑鼠右鍵按一下清單右側的 [更多] 按鈕 (...)。
選取 [刪除資源群組] ,並加以確認。
在本文件中,您會了解如何使用 Apache Kafka Producer 和 Consumer API 搭配 HDInsight 上的 Apache Kafka。 使用下列各項來深入了解 Kafka 的使用方式︰