この記事では、Apache Kafka クライアントと Apache Kafka ブローカーの間に、トランスポート層セキュリティ (TLS) 暗号化 (旧称 Secure Sockets Layer (SSL) 暗号化) を設定する方法について説明します。 クライアントの認証を設定する方法についても説明します (双方向 TLS とも呼ばれます)。
重要
Kafka アプリケーションでは、Java クライアントとコンソール クライアントの 2 種類のクライアントを使用できます。 生成と使用の両方に TLS を使用できるのは、Java クライアント ProducerConsumer.java
のみです。 TLS では、コンソール プロデューサー クライアント console-producer.sh
は機能しません。
Apache Kafka ブローカーのセットアップ
Kafka TLS ブローカーのセットアップでは、4 つの HDInsight クラスター VM が次のように使用されます。
- ヘッドノード 0 - 証明機関 (CA)
- ワーカー ノード 0、1、2 - ブローカー
注意
このガイドでは自己署名証明書を使用しますが、最も安全なソリューションは、信頼された CA によって発行された証明書を使うことです。
ブローカーのセットアップ プロセスの概要は次のとおりです。
次の手順を 3 つのワーカー ノードのそれぞれで繰り返します。
- 証明書を作成します。
- 証明書署名要求を作成します。
- 証明書署名要求を証明機関 (CA) に送信します。
- CA にサインインし、要求に署名します。
- 署名済みの証明書をワーカー ノードに SCP で送り返します。
- CA の公開証明書をワーカー ノードに SCP で送信します。
すべての証明書が揃ったら、証明書を証明書ストアに格納します。
Ambari に移動し、構成を変更します。
次の詳細な手順を使って、ブローカーのセットアップを完了します。
重要
次のコード スニペットで、wnX は 3 つのワーカー ノードの 1 つの省略形であり、
wn0
、wn1
、またはwn2
のいずれか適切なものに置き換える必要があります。WorkerNode0_Name
とHeadNode0_Name
はそれぞれのコンピューターの名前に置き換える必要があります。ヘッド ノード 0 で初期セットアップを実行します。HDInsight の場合は、証明機関 (CA) のロールを設定します。
# Create a new directory 'ssl' and change into it mkdir ssl cd ssl
各ブローカー (ワーカー ノード 0、1、2) で同じ初期セットアップを実行します。
# Create a new directory 'ssl' and change into it mkdir ssl cd ssl
各ワーカー ノードで、コード スニペットを使って以下の手順を行います。
- キーストアを作成し、新しいプライベート証明書を設定します。
- 証明書署名要求を作成します。
- 証明書署名要求を CA (headnode0) に SCP で送信します
keytool -genkey -keystore kafka.server.keystore.jks -keyalg RSA -validity 365 -storepass "MyServerPassword123" -keypass "MyServerPassword123" -dname "CN=FQDN_WORKER_NODE" -ext SAN=DNS:FQDN_WORKER_NODE -storetype pkcs12 keytool -keystore kafka.server.keystore.jks -certreq -file cert-file -storepass "MyServerPassword123" -keypass "MyServerPassword123" scp cert-file sshuser@HeadNode0_Name:~/ssl/wnX-cert-sign-request
Note
FQDN_WORKER_NODE は、ワーカー ノード コンピューターの完全修飾ドメイン名です。その詳細は、ヘッド ノードの /etc/hosts ファイルから取得できます
たとえば、次のように入力します。
wn0-espkaf.securehadooprc.onmicrosoft.com wn0-kafka2.zbxwnwsmpcsuvbjqbmespcm1zg.bx.internal.cloudapp.net
CA コンピューター上で、次のコマンドを実行して ca-cert と ca-key ファイルを作成します。
openssl req -new -newkey rsa:4096 -days 365 -x509 -subj "/CN=Kafka-Security-CA" -keyout ca-key -out ca-cert -nodes
CA のコンピューターに移動し、受け取ったすべての証明書署名要求に署名します。
openssl x509 -req -CA ca-cert -CAkey ca-key -in wn0-cert-sign-request -out wn0-cert-signed -days 365 -CAcreateserial -passin pass:"MyServerPassword123" openssl x509 -req -CA ca-cert -CAkey ca-key -in wn1-cert-sign-request -out wn1-cert-signed -days 365 -CAcreateserial -passin pass:"MyServerPassword123" openssl x509 -req -CA ca-cert -CAkey ca-key -in wn2-cert-sign-request -out wn2-cert-signed -days 365 -CAcreateserial -passin pass:"MyServerPassword123"
CA (headnode0) からワーカー ノードに、署名済みの証明書を返送します。
scp wn0-cert-signed sshuser@WorkerNode0_Name:~/ssl/cert-signed scp wn1-cert-signed sshuser@WorkerNode1_Name:~/ssl/cert-signed scp wn2-cert-signed sshuser@WorkerNode2_Name:~/ssl/cert-signed
CA の公開証明書を各ワーカー ノードに送信します。
scp ca-cert sshuser@WorkerNode0_Name:~/ssl/ca-cert scp ca-cert sshuser@WorkerNode1_Name:~/ssl/ca-cert scp ca-cert sshuser@WorkerNode2_Name:~/ssl/ca-cert
各ワーカー ノードで、CA の公開証明書をトラストストアとキーストアに追加します。 次に、ワーカー ノード自体の署名済み証明書をキーストアに追加します
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass "MyServerPassword123" -keypass "MyServerPassword123" -noprompt keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass "MyServerPassword123" -keypass "MyServerPassword123" -noprompt keytool -keystore kafka.server.keystore.jks -import -file cert-signed -storepass "MyServerPassword123" -keypass "MyServerPassword123" -noprompt
TLS を使用してブローカーを再起動するように Kafka 構成を更新する
キーストアとトラストストアを使用して各 Kafka ブローカーを設定し、適切な証明書をインポートしました。 次に、Ambari を使用して関連の Kafka 構成プロパティを変更して、Kafka ブローカーを再起動します。
構成の変更を完了するために、次の手順を実行します。
Azure portal にサインインし、ご利用の Azure HDInsight Apache Kafka クラスターを選択します。
[クラスター ダッシュボード] の [Ambari ホーム] をクリックして、Ambari UI に移動します。
[Kafka Broker]/(Kafka ブローカー/) で、リスナー プロパティを
PLAINTEXT://localhost:9092,SASL_SSL://localhost:9093
に設定します。[Advanced kafka-broker](Kafka ブローカーの高度な設定) で security.inter.broker.protocol プロパティを
SASL_SSL
に設定します。Custom kafka-broker\(カスタム kafka ブローカー) で ssl.client.auth プロパティを
required
に設定します。Note
この手順は、認証と暗号化を設定する場合にのみ必要です。
これらの変更を含む Ambari 構成 UI を示すスクリーンショットを次に示します。
注意
- ssl.keystore.location と truststore.location は、それぞれキーストアの完全パスと証明機関 (hn0) のトラストストアの場所です
- ssl.keystore.password と ssl.truststore.password は、それぞれキーストアおよびトラストストアに設定されたパスワードです。 この例では、
MyServerPassword123
です - ssl.key.password は、キーストアとトラストストアに設定されたキーです。 この例では、
MyServerPassword123
です
Kafka で TLS 1.3 を使用するには、Ambari の kafka 構成に次の構成を追加します。
ssl.enabled.protocols=TLSv1.3
ssl.protocol=TLSv1.3
重要
- TLS 1.3 は HDI 5.1 kafka バージョンでのみ機能します。
- サーバー側で TLS 1.3 を使用する場合は、クライアントでも TLS 1.3 構成を使用する必要があります。
HDI バージョン 4.0 または 5.0 の場合
認証と暗号化を設定する場合のスクリーンショットは次のようになります
暗号化のみを設定する場合のスクリーンショットは次のようになります
すべての Kafka ブローカーを再起動します。
クライアントのセットアップ (認証なし)
認証が必要ない場合に TLS 暗号化のみを設定する手順の概要は次のとおりです。
- CA (アクティブ ヘッド ノード) にサインインします。
- CA マシン (wn0) からクライアント マシンに CA 証明書をコピーします。
- クライアント マシン (hn1) にサインインし、
~/ssl
フォルダーに移動します。 - トラストストアに CA 証明書をインポートします。
- キーストアに CA 証明書をインポートします。
これらの手順の詳細については、次のコード スニペットを参考にしてください。
CA ノードにサインインします。
ssh sshuser@HeadNode0_Name cd ssl
CA 証明書をクライアント マシンにコピーします。
scp ca-cert sshuser@HeadNode1_Name:~/ssl/ca-cert
クライアント コンピューター (スタンバイ ヘッド ノード) にサインインします。
ssh sshuser@HeadNode1_Name cd ssl
トラストストアに CA 証明書をインポートします。
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass "MyClientPassword123" -keypass "MyClientPassword123" -noprompt
キーストアに CA 証明書をインポートします。
keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert -storepass "MyClientPassword123" -keypass "MyClientPassword123" -noprompt
クライアント コンピューター (hn1) でファイル
client-ssl-auth.properties
を作成します。 このファイルには、次の行が含まれています。security.protocol=SASL_SSL sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka ssl.truststore.location=/home/sshuser/ssl/kafka.client.truststore.jks ssl.truststore.password=MyClientPassword123
- TLS 1.3 を使用するには、次の構成をファイル
client-ssl-auth.properties
に追加します
ssl.enabled.protocols=TLSv1.3 ssl.protocol=TLSv1.3
- TLS 1.3 を使用するには、次の構成をファイル
プロデューサーとコンシューマーのオプションを使用して管理者クライアントを起動し、プロデューサーとコンシューマーの両方がポート 9093 上で動作していることを確認します。 コンソール プロデューサーまたはコンシューマーを使用してセットアップを確認するために必要な手順については、「検証」セクションをご覧ください。
クライアントのセットアップ (認証を使用)
注意
次の手順は、TLS 暗号化と認証の "両方" を設定する場合にのみ必要です。 暗号化のみを設定する場合は、「クライアントのセットアップ (認証なし)」を参照してください。
次の 4 つの手順は、クライアントのセットアップを完了するために必要なタスクをまとめたものです。
- クライアント コンピューター (スタンバイ ヘッド ノード) にサインインします。
- Java キーストアを作成し、ブローカーの署名証明書を取得します。 証明書を CA が実行されている VM にコピーします。
- CA コンピューター (アクティブ ヘッド ノード) に切り替えて、クライアント証明書に署名します。
- クライアント コンピューター (スタンバイ ヘッド ノード) に移動して、
~/ssl
フォルダーに移動します。 署名証明書をクライアント コンピューターにコピーします。
各手順の詳細は次のとおりです。
クライアント コンピューター (スタンバイ ヘッド ノード) にサインインします。
ssh sshuser@HeadNode1_Name
既存の SSL ディレクトリを削除します。
rm -R ~/ssl mkdir ssl cd ssl
Java キーストアを作成し、証明書署名要求を作成します。
keytool -genkey -keystore kafka.client.keystore.jks -validity 365 -storepass "MyClientPassword123" -keypass "MyClientPassword123" -dname "CN=HEADNODE1_FQDN" -storetype pkcs12 keytool -keystore kafka.client.keystore.jks -certreq -file client-cert-sign-request -storepass "MyClientPassword123" -keypass "MyClientPassword123"
証明書署名要求を CA にコピーします
scp client-cert-sign-request sshuser@HeadNode0_Name:~/ssl/client-cert-sign-request
CA マシン (アクティブ ヘッド ノード) に切り替えて、クライアント証明書に署名します。
ssh sshuser@HeadNode0_Name cd ssl openssl x509 -req -CA ca-cert -CAkey ca-key -in ~/ssl/client-cert-sign-request -out ~/ssl/client-cert-signed -days 365 -CAcreateserial -passin pass:MyClientPassword123
署名されたクライアント証明書を CA (アクティブ ヘッドノード) からクライアント マシンにコピーします。
scp client-cert-signed sshuser@HeadNode1_Name:~/ssl/client-signed-cert
CA 証明書をクライアント マシンにコピーします。
scp ca-cert sshuser@HeadNode1_Name:~/ssl/ca-cert
- クライアント コンピューター (スタンバイ ヘッド ノード) にサインインして、ssl ディレクトリに移動します。
ssh sshuser@HeadNode1_Name cd ssl
署名された証明書を使用してクライアント ストアを作成し、CA 証明書をクライアント コンピューター (hn1) のキーストアとトラストストアにインポートします。
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass "MyClientPassword123" -keypass "MyClientPassword123" -noprompt keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert -storepass "MyClientPassword123" -keypass "MyClientPassword123" -noprompt keytool -keystore kafka.client.keystore.jks -import -file client-signed-cert -storepass "MyClientPassword123" -keypass "MyClientPassword123" -noprompt
クライアント コンピューター (hn1) でファイル
client-ssl-auth.properties
を作成します。 このファイルには、次の行が含まれています。security.protocol=SASL_SSL sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka ssl.truststore.location=/home/sshuser/ssl/kafka.client.truststore.jks ssl.truststore.password=MyClientPassword123 ssl.keystore.location=/home/sshuser/ssl/kafka.client.keystore.jks ssl.keystore.password=MyClientPassword123 ssl.key.password=MyClientPassword123
- TLS 1.3 を使用するには、次の構成をファイル
client-ssl-auth.properties
に追加します
ssl.enabled.protocols=TLSv1.3 ssl.protocol=TLSv1.3
- TLS 1.3 を使用するには、次の構成をファイル
検証
クライアント コンピューターで次の手順を行います。
注意
HDInsight 4.0 と Kafka 2.1 がインストールされている場合は、コンソール プロデューサー/コンシューマーを使用してセットアップを確認できます。 そうでない場合は、ポート 9092 で Kafka プロデューサーを実行し、トピックにメッセージを送信してから、TLS を使用するポート 9093 で Kafka コンシューマーを使用します。
Kafka 2.1 以上
Note
以下のコマンドは、kafka
ユーザーか、または CRUD 操作にアクセスできるカスタム ユーザーを使用している場合に機能します。
コマンド ライン ツールの使用
コマンドの送信に使用するカスタム ユーザーのローカル kerberos チケットを検査していることを確認します。
klist
チケットが存在する場合は、次に進むことができます。 それ以外の場合は、以下のコマンドを使用して Kerberos の原則と keytab を生成します。
ktutil
ktutil: addent -password -p espkafkauser@TEST.COM -k 1 -e RC4-HMAC Password for espkafkauser@TEST.COM: ktutil: wkt user1.keytab ktutil: q kinit –kt espkafkauser.keytab espkafkauser@TEST.COM
klist
は再度 kerberos キャッシュ されたチケットを検査します。トピックがまだ存在しない場合は作成します。
sudo su kafka –c "/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper <ZOOKEEPER_NODE>:2181 --create --topic topic1 --partitions 2 --replication-factor 2"
keytab を使用するには、次の内容を含む Keytabファイルを作成します。 Keytab プロパティが Keytab ファイルを指すようにし、Keytab 内で使用されるプリンシパルを参照します。 以下は、作成され、VM 内の場所に配置されたサンプル JAAS ファイルです: /home/sshuser/kafka_client_jaas_keytab.conf
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/home/sshuser/espkafkauser.keytab" principal="espkafkauser@TEST.COM"; };
コンソール プロデューサーを起動し、プロデューサーの構成ファイルとして
client-ssl-auth.properties
へのパスを指定します。export KAFKA_OPTS="-Djava.security.auth.login.config=/home/hdiuser/kafka_client_jaas_keytab.conf" /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <FQDN_WORKER_NODE>:9093 --topic topic1 --producer.config ~/ssl/client-ssl-auth.properties
クライアント マシンへの別の SSH 接続を開き、コンソール コンシューマーを起動して、コンシューマーの構成ファイルとして
client-ssl-auth.properties
へのパスを指定します。export KAFKA_OPTS="-Djava.security.auth.login.config=/home/sshuser/kafka_client_jaas_keytab.conf" /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <FQDN_WORKER_NODE>:9093 --topic topic1 --consumer.config ~/ssl/client-ssl-auth.properties --from-beginning
Java クライアントを使用して CRUD 操作を実行する場合は、次の GitHub リポジトリを使用します。