演習 - Kafka プロデューサーを作成する
Kafka および Spark クラスターがデプロイされたので、kafka プロデューサーを Kafka ヘッド ノードに追加してみましょう。 このプロデューサーは、擬似株価を生成する株価スティミュレーターです。
サンプルのダウンロード
- インターネット ブラウザーで https://github.com/Azure/hdinsight-mslearn にアクセスしてサンプルをダウンロードするか、ローカルでそれを複製します (前のモジュールでまだ行っていない場合)。
- Spark Structured Streaming\python-producer-simulator-template.py ファイルをローカルで開きます。
Kafka ブローカーの URL を取得する
次に、Kafka ブローカーの URL を取得する必要があります。そのためには、ヘッド ノード上で SSH 接続を使用し、それらの URL を Python ファイルに追加します。
Apache Kafka クラスターのプライマリ ヘッド ノードに接続するには、そのノードに SSH で接続する必要があります。 接続には、Azure portal の Azure Cloud Shell をお勧めします。 Azure portal で、上部のツールバーにある Azure Cloud Shell ボタンをクリックし、[Bash] を選択します。 また、Git Bash などの SSH 対応コマンド プロンプトを使用することもできます。
Azure Cloud Shell を使用したことがない場合は、ストレージがマウントされていないことを示す通知が表示されます。 [サブスクリプション] ボックスからご自分の Azure サブスクリプションを選択し、[ストレージの作成] をクリックします。
クラウド プロンプトで、次のコマンドを貼り付けます。
sshuser
を SSH ユーザー名で置き換えます。kafka-mslearn-stock
をお使いの Apache Kafka クラスターの名前に置き換えます。クラスター名の後に -ssh を含める必要があることに注意してください。ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
クラスターに初めて接続すると、ホストの信頼性を確立できないという警告が SSH クライアントに表示されることがあります。 プロンプトが表示されたら、「yes」と入力して、Enter キーを押し、SSH クライアントの信頼済みサーバーの一覧にこのホストを追加します。
メッセージが表示されたら、SSH ユーザーのパスワードを入力します。
接続されると、次のテキストのような情報が表示されます。
Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64) * Documentation: https://help.ubuntu.com * Management: https://landscape.canonical.com * Support: https://ubuntu.com/advantage * Overheard at KubeCon: "microk8s.status just blew my mind". https://microk8s.io/docs/commands#microk8s.status 0 packages can be updated. 0 updates are security updates. Welcome to Kafka on HDInsight. The programs included with the Ubuntu system are free software; the exact distribution terms for each program are described in the individual files in /usr/share/doc/*/copyright. Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by applicable law. To run a command as administrator (user "root"), use "sudo <command>". See "man sudo_root" for details.
コマンド ライン JSON プロセッサの jq をインストールします。 このユーティリティは JSON ドキュメントを解析するためのものであり、ホスト情報の解析に役立ちます。 開いた SSH 接続から次のコマンドを入力して、
jq
をインストールします。sudo apt -y install jq
パスワード変数を設定します。
PASSWORD
をクラスターのログイン パスワードに置き換えてから、次のコマンドを入力します。export password='PASSWORD'
大文字と小文字が正しく区別されたクラスター名を抽出します。 クラスターの作成方法によっては、クラスター名の実際の大文字小文字の区別が予想と異なる場合があります。 このコマンドは、実際の大文字小文字の区別を取得し、変数に格納します。 次のコマンドを入力します。
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
このコマンドには応答がありません。
Zookeeper ホスト情報で環境変数を設定するには、次のコマンドを使用します。 このコマンドはすべての Zookeeper ホストを取得してから、最初の 2 つのエントリのみを返します。 これは、1 つのホストに到達できない場合に、いくらかの冗長性が必要なためです。
export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
Note
このコマンドでは、Ambari アクセスが必要です。 クラスターが NSG の背後にある場合は、Ambari にアクセスできるコンピューターからこのコマンドを実行します。
このコマンドにも応答がありません。
環境変数が正しく設定されていることを確認するには、次のコマンドを使用します。
echo $KAFKAZKHOSTS
このコマンドでは、次のテキストのような情報が返されます。
zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
Apache Kafka ブローカー ホスト情報で環境変数を設定するには、次のコマンドを使用します。
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Note
このコマンドでは、Ambari アクセスが必要です。 クラスターが NSG の背後にある場合は、Ambari にアクセスできるコンピューターからこのコマンドを実行します。
このコマンドには出力がありません。
環境変数が正しく設定されていることを確認するには、次のコマンドを使用します。
echo $KAFKABROKERS
このコマンドでは、次のテキストのような情報が返されます。
wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
前の手順で返された Kafka ブローカーの値の 1 つを 19 行目の python-producer-simulator-template.py ファイルにコピーし、その値を一重引用符で囲みます。次に例を示します。
kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
python-producer-simulator-template-simulator-template.py ファイルを保存します。
SSH 接続ウィンドウに戻り、次のコマンドを使用してトピックを作成します。
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
このコマンドでは、$KAFKAZKHOSTS に格納されているホスト情報を使用して Zookeeper に接続します。 次に、python-producer-simulator-template.py に含まれるトピック名と一致するように、stockVals という名前の Apache Kafka トピックを作成します。
Python ファイルをヘッド ノードにコピーし、ファイルを実行してデータをストリーム配信する
新しい Git ウィンドウで、python-producer-simulator-template.py ファイルの場所に移動し、次のコマンドを使用して、そのファイルをローカル コンピューターからプライマリ ヘッド ノードにコピーします。
kafka-mslearn-stock
をお使いの Apache Kafka クラスターの名前に置き換えます。クラスター名の後に -ssh を含める必要があることに注意してください。scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
接続を続行するかどうかを確認するメッセージが表示されたら、「yes」と入力します。 次に、プロンプトで、クラスターのパスワードを入力します。 ファイルの転送が完了すると、次の出力が表示されます。
python-producer-simulator-template.py 100% 1896 71.9KB/s 00:00
ここで、ブローカー情報を取得した Azure コマンド プロンプトに戻り、次のコマンドを実行して Kafka をインストールします。
sudo pip install kafka-python
Kafka が正常にインストールされると、次の出力が表示されます。
Installing collected packages: kafka-python Successfully installed kafka-python-1.4.7
同じウィンドウで、次のコマンドを使用して要求をインストールします。
sudo apt-get install python-requests
"After this operation, 4,327 kB of additional disk space will be used. (この操作の後、4,327 kB の追加ディスク領域が使用されます。) Do you want to continue? (続行してもよろしいですか?) [Y/n]" というメッセージが表示されたら、「y」と入力します。
要求が正常にインストールされると、次のような出力が表示されます。
Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ... Setting up python-requests (2.9.1-3ubuntu0.1) ...
同じウィンドウで、次のコマンドを使用して Python ファイルを実行します
python python-producer-simulator-template.py
次のような出力が表示されます。
No loops argument provided. Default loops are 1000 Running in simulated mode [ { "symbol": "MSFT", "size": 355, "price": 147.205, "time": 1578029521022 }, { "symbol": "BA", "size": 345, "price": 352.607, "time": 1578029521022 }, { "symbol": "JNJ", "size": 58, "price": 142.043, "time": 1578029521022 }, { "symbol": "F", "size": 380, "price": 8.545, "time": 1578029521022 }, { "symbol": "TSLA", "size": 442, "price": 329.342, "time": 1578029521022 }, { "symbol": "BAC", "size": 167, "price": 32.921, "time": 1578029521022 }, { "symbol": "GE", "size": 222, "price": 11.115, "time": 1578029521022 }, { "symbol": "MMM", "size": 312, "price": 174.643, "time": 1578029521022 }, { "symbol": "INTC", "size": 483, "price": 54.978, "time": 1578029521022 }, { "symbol": "WMT", "size": 387, "price": 120.355, "time": 1578029521022 } ] stockVals 2 0 stockVals 1 0 stockVals 3 0 stockVals 2 1 stockVals 7 0 stockVals 7 1 stockVals 1 1 stockVals 4 0 stockVals 4 1 stockVals 1 2
この出力では、python-producer-simulated-template.py ファイルに記載されている株式の擬似株価が示され、その後にトピック、パーティション、トピック内のメッセージのオフセットが示されます。 プロデューサーがトリガーされるたびに (毎秒)、株価の新しいバッチが生成され、それぞれの新しいメッセージが、特定のオフセットでパーティションに追加されることがわかります。