演習 - Kafka プロデューサーを作成する

完了

Kafka および Spark クラスターがデプロイされたので、kafka プロデューサーを Kafka ヘッド ノードに追加してみましょう。 このプロデューサーは、擬似株価を生成する株価スティミュレーターです。

サンプルのダウンロード

  1. インターネット ブラウザーで https://github.com/Azure/hdinsight-mslearn にアクセスしてサンプルをダウンロードするか、ローカルでそれを複製します (前のモジュールでまだ行っていない場合)。
  2. Spark Structured Streaming\python-producer-simulator-template.py ファイルをローカルで開きます。

Kafka ブローカーの URL を取得する

次に、Kafka ブローカーの URL を取得する必要があります。そのためには、ヘッド ノード上で SSH 接続を使用し、それらの URL を Python ファイルに追加します。

  1. Apache Kafka クラスターのプライマリ ヘッド ノードに接続するには、そのノードに SSH で接続する必要があります。 接続には、Azure portal の Azure Cloud Shell をお勧めします。 Azure portal で、上部のツールバーにある Azure Cloud Shell ボタンをクリックし、[Bash] を選択します。 また、Git Bash などの SSH 対応コマンド プロンプトを使用することもできます。

  2. Azure Cloud Shell を使用したことがない場合は、ストレージがマウントされていないことを示す通知が表示されます。 [サブスクリプション] ボックスからご自分の Azure サブスクリプションを選択し、[ストレージの作成] をクリックします。

  3. クラウド プロンプトで、次のコマンドを貼り付けます。 sshuser を SSH ユーザー名で置き換えます。 kafka-mslearn-stock をお使いの Apache Kafka クラスターの名前に置き換えます。クラスター名の後に -ssh を含める必要があることに注意してください。

    ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
    
  4. クラスターに初めて接続すると、ホストの信頼性を確立できないという警告が SSH クライアントに表示されることがあります。 プロンプトが表示されたら、「yes」と入力して、Enter キーを押し、SSH クライアントの信頼済みサーバーの一覧にこのホストを追加します。

  5. メッセージが表示されたら、SSH ユーザーのパスワードを入力します。

    接続されると、次のテキストのような情報が表示されます。

        Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64)
    
        * Documentation:  https://help.ubuntu.com
        * Management:     https://landscape.canonical.com
        * Support:        https://ubuntu.com/advantage
    
        * Overheard at KubeCon: "microk8s.status just blew my mind".
    
            https://microk8s.io/docs/commands#microk8s.status
    
        0 packages can be updated.
        0 updates are security updates.
    
    
    
        Welcome to Kafka on HDInsight.
    
    
        The programs included with the Ubuntu system are free software;
        the exact distribution terms for each program are described in the
        individual files in /usr/share/doc/*/copyright.
    
        Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
        applicable law.
    
        To run a command as administrator (user "root"), use "sudo <command>".
        See "man sudo_root" for details.
    
  6. コマンド ライン JSON プロセッサの jq をインストールします。 このユーティリティは JSON ドキュメントを解析するためのものであり、ホスト情報の解析に役立ちます。 開いた SSH 接続から次のコマンドを入力して、jq をインストールします。

    sudo apt -y install jq
    
  7. パスワード変数を設定します。 PASSWORD をクラスターのログイン パスワードに置き換えてから、次のコマンドを入力します。

    export password='PASSWORD'
    
  8. 大文字と小文字が正しく区別されたクラスター名を抽出します。 クラスターの作成方法によっては、クラスター名の実際の大文字小文字の区別が予想と異なる場合があります。 このコマンドは、実際の大文字小文字の区別を取得し、変数に格納します。 次のコマンドを入力します。

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    このコマンドには応答がありません。

  9. Zookeeper ホスト情報で環境変数を設定するには、次のコマンドを使用します。 このコマンドはすべての Zookeeper ホストを取得してから、最初の 2 つのエントリのみを返します。 これは、1 つのホストに到達できない場合に、いくらかの冗長性が必要なためです。

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    Note

    このコマンドでは、Ambari アクセスが必要です。 クラスターが NSG の背後にある場合は、Ambari にアクセスできるコンピューターからこのコマンドを実行します。

    このコマンドにも応答がありません。

  10. 環境変数が正しく設定されていることを確認するには、次のコマンドを使用します。

    echo $KAFKAZKHOSTS
    

    このコマンドでは、次のテキストのような情報が返されます。

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  11. Apache Kafka ブローカー ホスト情報で環境変数を設定するには、次のコマンドを使用します。

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Note

    このコマンドでは、Ambari アクセスが必要です。 クラスターが NSG の背後にある場合は、Ambari にアクセスできるコンピューターからこのコマンドを実行します。

    このコマンドには出力がありません。

  12. 環境変数が正しく設定されていることを確認するには、次のコマンドを使用します。

    echo $KAFKABROKERS
    

    このコマンドでは、次のテキストのような情報が返されます。

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

  13. 前の手順で返された Kafka ブローカーの値の 1 つを 19 行目の python-producer-simulator-template.py ファイルにコピーし、その値を一重引用符で囲みます。次に例を示します。

    kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
    
  14. python-producer-simulator-template-simulator-template.py ファイルを保存します。

  15. SSH 接続ウィンドウに戻り、次のコマンドを使用してトピックを作成します。

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
    

このコマンドでは、$KAFKAZKHOSTS に格納されているホスト情報を使用して Zookeeper に接続します。 次に、python-producer-simulator-template.py に含まれるトピック名と一致するように、stockVals という名前の Apache Kafka トピックを作成します。

Python ファイルをヘッド ノードにコピーし、ファイルを実行してデータをストリーム配信する

  1. 新しい Git ウィンドウで、python-producer-simulator-template.py ファイルの場所に移動し、次のコマンドを使用して、そのファイルをローカル コンピューターからプライマリ ヘッド ノードにコピーします。 kafka-mslearn-stock をお使いの Apache Kafka クラスターの名前に置き換えます。クラスター名の後に -ssh を含める必要があることに注意してください。

    scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
    

    接続を続行するかどうかを確認するメッセージが表示されたら、「yes」と入力します。 次に、プロンプトで、クラスターのパスワードを入力します。 ファイルの転送が完了すると、次の出力が表示されます。

    python-producer-simulator-template.py    100% 1896    71.9KB/s   00:00
    
  2. ここで、ブローカー情報を取得した Azure コマンド プロンプトに戻り、次のコマンドを実行して Kafka をインストールします。

    sudo pip install kafka-python
    

    Kafka が正常にインストールされると、次の出力が表示されます。

    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.7
    
  3. 同じウィンドウで、次のコマンドを使用して要求をインストールします。

    sudo apt-get install python-requests
    
  4. "After this operation, 4,327 kB of additional disk space will be used. (この操作の後、4,327 kB の追加ディスク領域が使用されます。) Do you want to continue? (続行してもよろしいですか?) [Y/n]" というメッセージが表示されたら、「y」と入力します。

    要求が正常にインストールされると、次のような出力が表示されます。

    Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ...
    Setting up python-requests (2.9.1-3ubuntu0.1) ...
    
  5. 同じウィンドウで、次のコマンドを使用して Python ファイルを実行します

    python python-producer-simulator-template.py
    

    次のような出力が表示されます。

    No loops argument provided. Default loops are 1000
    Running in simulated mode
    [
    {
        "symbol": "MSFT",
        "size": 355,
        "price": 147.205,
        "time": 1578029521022
    },
    {
        "symbol": "BA",
        "size": 345,
        "price": 352.607,
        "time": 1578029521022
    },
    {
        "symbol": "JNJ",
        "size": 58,
        "price": 142.043,
        "time": 1578029521022
    },
    {
        "symbol": "F",
        "size": 380,
        "price": 8.545,
        "time": 1578029521022
    },
    {
        "symbol": "TSLA",
        "size": 442,
        "price": 329.342,
        "time": 1578029521022
    },
    {
        "symbol": "BAC",
        "size": 167,
        "price": 32.921,
        "time": 1578029521022
    },
    {
        "symbol": "GE",
        "size": 222,
        "price": 11.115,
        "time": 1578029521022
    },
    {
        "symbol": "MMM",
        "size": 312,
        "price": 174.643,
        "time": 1578029521022
    },
    {
        "symbol": "INTC",
        "size": 483,
        "price": 54.978,
        "time": 1578029521022
    },
    {
        "symbol": "WMT",
        "size": 387,
        "price": 120.355,
        "time": 1578029521022
    }
    ]
    stockVals
    2
    0
    stockVals
    1
    0
    stockVals
    3
    0
    stockVals
    2
    1
    stockVals
    7
    0
    stockVals
    7
    1
    stockVals
    1
    1
    stockVals
    4
    0
    stockVals
    4
    1
    stockVals
    1
    2
    

この出力では、python-producer-simulated-template.py ファイルに記載されている株式の擬似株価が示され、その後にトピック、パーティション、トピック内のメッセージのオフセットが示されます。 プロデューサーがトリガーされるたびに (毎秒)、株価の新しいバッチが生成され、それぞれの新しいメッセージが、特定のオフセットでパーティションに追加されることがわかります。