Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
HDInsight üzerinde Kafka ile Apache Kafka Üretici ve Tüketici API'lerini kullanmayı öğrenin.
Kafka Üretici API'si, uygulamaların Kafka kümesine veri akışları göndermesine olanak tanır. Kafka Tüketici API'si, uygulamaların kümeden veri akışlarını okumasına olanak tanır.
Bu öğreticide aşağıdakilerin nasıl yapılacağını öğreneceksiniz:
- Önkoşullar
- Kodu anlama
- Uygulama derleme ve dağıtma
- Uygulamayı kümede çalıştırma
API'ler hakkında daha fazla bilgi için Üretici API'sine ve Tüketici API'sine ilişkin Apache belgelerine bakın.
Önkoşullar
- HDInsight kümesinde Apache Kafka. Kümenin nasıl oluşturulacağını öğrenmek için bkz. HDInsight üzerinde Apache Kafka ile başlama.
- Java Developer Kit (JDK) sürüm 8 veya OpenJDK gibi bir eşdeğer.
- Apache Maven , Apache'ye göre düzgün bir şekilde yüklendi . Maven, Java projeleri için bir proje derleme sistemidir.
- Putty gibi bir SSH istemcisi. Daha fazla bilgi için bkz. SSH kullanarak HDInsight'a (Apache Hadoop) bağlanma.
Kodu anlama
Örnek uygulama, alt dizininde https://github.com/Azure-Samples/hdinsight-kafka-java-get-started konumunda bulunurProducer-Consumer
.
Kurumsal Güvenlik Paketi (ESP) özellikli Kafka kümesini kullanıyorsanız, alt dizinde DomainJoined-Producer-Consumer
bulunan uygulama sürümünü kullanmanız gerekir.
Uygulama öncelikli olarak dört dosyadan oluşur:
-
pom.xml
: Bu dosya proje bağımlılıklarını, Java sürümünü ve paketleme yöntemlerini tanımlar. -
Producer.java
: Bu dosya, üretici API'sini kullanarak Kafka'ya rastgele cümleler gönderir. -
Consumer.java
: Bu dosya, Kafka'dan verileri okumak ve STDOUT'a yaymak için tüketici API'sini kullanır. -
AdminClientWrapper.java
: Bu dosya, Kafka konularını oluşturmak, açıklamak ve silmek için yönetici API'sini kullanır. -
Run.java
: Üretici ve tüketici kodunu çalıştırmak için kullanılan komut satırı arabirimi.
Pom.xml
Dosyada pom.xml
anlaşılması gereken önemli şeyler şunlardır:
Bağımlılıklar: Bu proje, paket tarafından
kafka-clients
sağlanan Kafka üretici ve tüketici API'lerine dayanır. Aşağıdaki XML kodu bu bağımlılığı tanımlar:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
${kafka.version}
girdisi,<properties>..</properties>
bölümündepom.xml
olarak beyan edilir ve HDInsight kümesinin belirli Kafka sürümüne yapılandırılmıştır.Eklentiler: Maven eklentileri çeşitli özellikler sağlar. Bu projede aşağıdaki eklentiler kullanılır:
-
maven-compiler-plugin
: Proje tarafından kullanılan Java sürümünü 8 olarak ayarlamak için kullanılır. Bu, HDInsight 3.6 tarafından kullanılan Java sürümüdür. -
maven-shade-plugin
: Bu uygulamanın yanı sıra tüm bağımlılıkları içeren bir uber jar oluşturmak için kullanılır. Ana sınıfı belirtmek zorunda kalmadan Jar dosyasını doğrudan çalıştırabilmeniz için uygulamanın giriş noktasını ayarlamak için de kullanılır.
-
Producer.java
Üretici, Kafka aracı sunucuları (çalışan düğümleri) ile iletişim kurar ve bir Kafka konusuna veri gönderir. Aşağıdaki kod parçacığı GitHub deposundakiProducer.java dosyasından alınıp üretici özelliklerinin nasıl ayarlanacağı gösterilmektedir. Kurumsal Güvenlik Özellikli kümeler için ek bir özellik eklenmelidir"properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
Tüketici Kafka aracı konaklarıyla (çalışan düğümleri) iletişim kurar ve döngüdeki kayıtları okur. Consumer.java dosyasındaki aşağıdaki kod parçacığı tüketici özelliklerini ayarlar. Kurumsal Güvenlik Özellikli kümeler için ek bir özellik eklenmelidir"properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
Bu kodda, tüketici konunun başından itibaren okuyacak şekilde yapılandırılır (auto.offset.reset
olarak ayarlanır earliest
.)
Run.java
Run.java dosyası, üretici veya tüketici kodunu çalıştıran bir komut satırı arabirimi sağlar. Kafka broker ana bilgisayar bilgilerini parametre olarak sağlamanız gerekir. İsteğe bağlı olarak, tüketici işlemi tarafından kullanılan bir grup kimliği değeri ekleyebilirsiniz. Aynı grup kimliğini kullanarak birden çok tüketici örneği oluşturursanız, bunlar konu başlığından okuma yükünü dengeler.
Örneği oluşturma ve dağıtma
Önceden oluşturulmuş JAR dosyalarını kullanma
Kafka Kullanmaya Başlama Azure örneğinden jars dosyalarını indirin. Kümeniz Kurumsal Güvenlik Paketi (ESP) etkinse kafka-producer-consumer-esp.jar kullanın. Jar'ları kümenize kopyalamak için aşağıdaki komutu kullanın.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Koddan JAR dosyalarını oluşturma
Bu adımı atlamak isterseniz, önceden oluşturulmuş jar'lar alt dizinden Prebuilt-Jars
indirilebilir. kafka-producer-consumer.jar indirin. Kümeniz Kurumsal Güvenlik Paketi (ESP) etkinse, kafka-producer-consumer-esp.jar dosyasını kullanın. Jar dosyasını HDInsight kümenize kopyalamak için 3. adımı yürütür.
Örnekleri https://github.com/Azure-Samples/hdinsight-kafka-java-get-started adresinden indirin ve çıkartın.
Geçerli dizininizi
hdinsight-kafka-java-get-started\Producer-Consumer
dizininin bulunduğu konuma ayarlayın. Kurumsal Güvenlik Paketi (ESP) özellikli Kafka kümesi kullanıyorsanız, konumu alt dizin olarakDomainJoined-Producer-Consumer
ayarlamanız gerekir. Uygulamayı derlemek için aşağıdaki komutu kullanın:mvn clean package
Bu komut, içinde
kafka-producer-consumer-1.0-SNAPSHOT.jar
adlı bir dosya bulunduran,target
adlı bir dizin oluşturur. ESP kümeleri için dosya şu şekilde olacaktır:kafka-producer-consumer-esp-1.0-SNAPSHOT.jar
Kümenizin SSH kullanıcısı ile
sshuser
değerini ve kümenizin adıylaCLUSTERNAME
değerini değiştirin. Dosyayı HDInsight kümenize kopyalamakkafka-producer-consumer-1.0-SNAPSHOT.jar
için aşağıdaki komutu girin. İstendiğinde SSH kullanıcısının parolasını girin.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Örneği çalıştırma
Kümenizin SSH kullanıcısı ile
sshuser
değerini ve kümenizin adıylaCLUSTERNAME
değerini değiştirin. Aşağıdaki komutu girerek kümeye bir SSH bağlantısı açın. İstenirse, SSH kullanıcı hesabının parolasını girin.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Kafka broker konaklarını almak için, aşağıdaki komutta
<clustername>
ve<password>
değerlerini değiştirip yürütün. Azure portalında gösterildiği gibi<clustername>
için aynı büyük/küçük harfleri kullanın.<password>
'yi küme oturum açma parolası ile değiştirin, ardından şu komutu yürütün:sudo apt -y install jq export CLUSTER_NAME='<clustername>' export PASSWORD='<password>' export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Uyarı
Bu komut Ambari erişimi gerektirir. Kümeniz bir NSG'nin arkasındaysa Ambari'ye erişebilen bir makineden bu komutu çalıştırın.
Aşağıdaki komutu girerek Kafka konusu
myTest
oluşturun:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
Üreticiyi çalıştırmak ve konuya veri yazmak için aşağıdaki komutu kullanın:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
Üretici işlemi tamamladıktan sonra, konudan okumak için aşağıdaki komutu kullanın.
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Okunan kayıtlar ve kayıt sayısı görüntülenir.
Tüketiciden çıkmak için Ctrl + C tuşlarını kullanın.
Birden çok tüketici
Kafka tüketicileri, kayıtları okurken bir tüketici grubu kullanır. Aynı grubun birden çok tüketiciyle kullanılması, bir konudaki yükün dengeli dağıldığı okumalarla sonuçlanmaktadır. Gruptaki her tüketici kayıtların bir bölümünü alır.
Tüketici uygulaması, grup kimliği olarak kullanılan bir parametreyi kabul eder. Örneğin, aşağıdaki komut myGroup
grup kimliğini kullanarak bir tüketiciyi başlatır.
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Tüketiciden çıkmak için Ctrl + C tuşlarını kullanın.
Bu işlemi uygulamada görmek için aşağıdaki komutu kullanın:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Bu komut terminali iki sütuna bölmek için kullanır tmux
. Her sütunda aynı grup kimliği değeriyle bir tüketici başlatılır. Tüketiciler okumayı bitirdikten sonra, her birinin kayıtların yalnızca bir bölümünü okuduğuna dikkat edin. çıkmak tmux
için Ctrl + C tuşlarını iki kez kullanın.
Aynı gruptaki istemciler tarafından tüketim, konunun partisyonları aracılığıyla işlenir. Bu kod örneğinde test
, daha önce oluşturulan konuda sekiz bölüm vardır. Sekiz tüketici başlatırsanız, her tüketici konuya ait tek bir bölümden kayıtları okur.
Önemli
Bir tüketici grubunda bölümlerden daha fazla tüketici örneği olamaz. Bu örnekte, konu başlığındaki bölüm sayısı olduğundan, bir tüketici grubu en fazla sekiz tüketici içerebilir. Ya da her birinin en fazla sekiz tüketicisi olan birden çok tüketici grubunuz olabilir.
Kafka'da depolanan kayıtlar, bir bölüm içinde alınma sırasına göre depolanır. Bir bölüm içindeki kayıtlar için sıralı teslim elde etmek için, tüketici örneği sayısının bölüm sayısıyla eşleştiği bir tüketici grubu oluşturun. Konu başlığındaki kayıtlar için sıralı teslim elde etmek için, yalnızca bir tüketici örneğine sahip bir tüketici grubu oluşturun.
Karşılaşılan Yaygın Sorunlar
Konu oluşturma başarısız oldu Kümeniz Kurumsal Güvenlik Paketi etkinse, üretici ve tüketici için önceden oluşturulmuş JAR dosyalarını kullanın. ESP jar alt dizinindeki
DomainJoined-Producer-Consumer
koddan oluşturulabilir. Üretici ve tüketici özellikleri, ESP özellikli kümeler için ek bir özelliğeCommonClientConfigs.SECURITY_PROTOCOL_CONFIG
sahiptir.ESP özellikli kümelerde hata: Oluşturma ve kullanma işlemleri başarısız olursa ve ESP özellikli bir küme kullanıyorsanız, kullanıcının
kafka
tüm Ranger ilkelerinde mevcut olup olmadığını denetleyin. Varsa ekleyin, değilse tüm Ranger politikalara ekleyin.
Kaynakları temizleme
Bu öğretici tarafından oluşturulan kaynakları temizlemek için kaynak grubunu silebilirsiniz. Kaynak grubunun silinmesi, ilişkili HDInsight kümesini ve kaynak grubuyla ilişkili diğer tüm kaynakları da siler.
Azure portalını kullanarak kaynak grubunu kaldırmak için:
- Azure portalında sol taraftaki menüyü genişleterek hizmet menüsünü açın ve sonra Kaynak Grupları'nı seçerek kaynak gruplarınızın listesini görüntüleyin.
- Silinecek kaynak grubunu bulun ve sonra listenin sağ tarafındaki Daha fazla düğmesine (...) sağ tıklayın.
- Kaynak grubunu sil'i seçip onaylayın.
Sonraki adımlar
Bu belgede, HDInsight üzerinde Kafka ile Apache Kafka Üretici ve Tüketici API'sini kullanmayı öğrendiniz. Kafka ile çalışma hakkında daha fazla bilgi edinmek için aşağıdakileri kullanın: