Share via


Kafka 用 Azure Event Hubs に対してパスワードレス接続を使用するようにアプリケーションを移行する

この記事では、従来の認証方法から、Kafka のAzure Event Hubsを使用して、より安全なパスワードレス接続に移行する方法について説明します。

Kafka のAzure Event Hubsに対するアプリケーション要求を認証する必要があります。 Kafka のAzure Event Hubsには、アプリが安全に接続するためのさまざまな方法が用意されています。 方法の 1 つは、接続文字列を使用する方法です。 ただしアプリケーションには、可能であればパスワードレス接続を優先的に使用することをお勧めします。

パスワードレス接続は、Spring Cloud Azure 4.3.0 以降でサポートされています。 この記事は、Spring Cloud Stream Kafka アプリケーションから資格情報を削除するための移行ガイドです。

認証オプションを比較する

アプリケーションが Kafka のAzure Event Hubsで認証されると、Event Hubs 名前空間に接続するための承認されたエンティティが提供されます。 Apache Kafka プロトコルは、認証に複数の簡易認証およびセキュリティ層 (SASL) メカニズムを提供します。 SASL メカニズムによると、セキュリティで保護されたリソースへのアクセスを承認するために使用できる認証オプションは、Microsoft Entra認証と Shared Access Signature (SAS) 認証の 2 つです。

Microsoft Entra 認証

Microsoft Entra認証は、Microsoft Entra ID で定義された ID を使用して Kafka のAzure Event Hubsに接続するためのメカニズムです。 Microsoft Entra認証を使用すると、サービス プリンシパル ID やその他の Microsoft サービスを中央の場所で管理できるため、アクセス許可の管理が簡略化されます。

認証にMicrosoft Entra ID を使用すると、次の利点があります。

  • 一様な方法で Azure サービス全体のユーザーの認証。
  • 1 か所でのパスワード ポリシーとパスワードローテーションの管理。
  • Microsoft Entra ID でサポートされる複数の形式の認証により、パスワードを格納する必要がなくなります。
  • お客様は、外部 (Microsoft Entra ID) グループを使用して Event Hubs のアクセス許可を管理できます。
  • Kafka のAzure Event Hubsに接続するアプリケーションのトークンベース認証のサポート。

SAS 認証

Event Hubs には、Kafka リソースの Event Hubs への委任されたアクセス用の Shared Access Signature (SAS) も用意されています。

SAS を使用して Kafka のAzure Event Hubsに接続することはできますが、注意して使用する必要があります。 セキュリティで保護されていない場所で接続文字列を公開することは絶対にしないでください。 接続文字列にアクセスできるユーザーは誰でも認証できます。 たとえば、接続文字列が誤ってソース管理にチェックインされたり、セキュリティで保護されていないメールを通じて送信されたり、間違ったチャットに貼り付けたり、アクセス許可を持っていないユーザーによって表示されたりした場合、悪意のあるユーザーがアプリケーションにアクセスするリスクがあります。 代わりに、OAuth 2.0 トークンベースのメカニズムを使用してアクセスを承認すると、SAS よりも優れたセキュリティと使いやすさが提供されます。 パスワードレス接続を使用するようにアプリケーションを更新することを検討してください。

パスワードレス接続の概要

パスワードなしの接続を使用すると、アプリケーション コード、その構成ファイル、または環境変数に資格情報を格納せずに Azure サービスに接続できます。

多くの Azure サービスでは、たとえば Azure マネージド ID を使用したパスワードレス接続がサポートされています。 これらの手法は、Azure Identity クライアント ライブラリから DefaultAzureCredential を 使用して実装できる堅牢なセキュリティ機能を提供します。 このチュートリアルでは、接続文字列などの代替手段の代わりに使用 DefaultAzureCredential するように既存のアプリケーションを更新する方法について説明します。

DefaultAzureCredential は複数の認証方法をサポートしており、どの方法が使用されるかは実行時に決定されます。 このアプローチを採用すると、環境固有のコードを実装することなく、異なる環境 (ローカル開発環境と運用環境) で異なる認証方法をアプリに使用できます。

資格情報を検索する DefaultAzureCredential 順序と場所については、 Azure ID ライブラリの概要に関するページを参照してください。 たとえば、ローカルで作業する場合は、通常、 DefaultAzureCredential 開発者が Visual Studio へのサインインに使用したアカウントを使用して認証を行います。 アプリを Azure にデプロイすると、DefaultAzureCredential が自動的に切り替わってマネージド ID が使用されるようになります。 この移行のためにコードを変更する必要はありません。

接続がパスワードレスになるようにするには、ローカル開発と運用環境の両方を考慮する必要があります。 いずれかの場所で接続文字列が必要な場合、アプリケーションはパスワードレスではありません。

ローカル開発環境では、Visual Studio Code または IntelliJ 用の Azure CLI、Azure PowerShell、Visual Studio、または Azure プラグインを使用して認証できます。 この場合は、プロパティを構成する代わりに、アプリケーションでその資格情報を使用できます。

仮想マシンなどの Azure ホスティング環境にアプリケーションをデプロイする場合は、その環境でマネージド ID を割り当てることができます。 その後、Azure サービスに接続するために資格情報を指定する必要はありません。

注意

マネージド ID は、アプリまたはサービスを表すセキュリティ ID を提供します。 ID は Azure プラットフォームによって管理され、ユーザーがシークレットをプロビジョニングまたはローテーションする必要はありません。 マネージド ID の詳細については、概要ドキュメントを参照してください。

パスワードレス接続を使用するように既存のアプリケーションを移行する

次の手順では、SAS ソリューションではなくパスワードレス接続を使用するように既存のアプリケーションを移行する方法について説明します。

0) ローカル開発認証のために作業環境を準備する

まず、次のコマンドを使用して、いくつかの環境変数を設定します。

export AZ_RESOURCE_GROUP=<YOUR_RESOURCE_GROUP>
export AZ_EVENTHUBS_NAMESPACE_NAME=<YOUR_EVENTHUBS_NAMESPACE_NAME>
export AZ_EVENTHUB_NAME=<YOUR_EVENTHUB_NAME>

プレースホルダーは、この記事全体で使用される次の値に置き換えてください。

  • <YOUR_RESOURCE_GROUP>: 使用するリソース グループの名前。
  • <YOUR_EVENTHUBS_NAMESPACE_NAME>: 使用するAzure Event Hubs名前空間の名前。
  • <YOUR_EVENTHUB_NAME>: 使用するイベント ハブの名前。

1) Azure Event Hubsのアクセス許可を付与する

Microsoft Entra認証を使用してこのサンプルをローカルで実行する場合は、ユーザー アカウントが Azure Toolkit for IntelliJ、Visual Studio Code Azure Account プラグイン、または Azure CLI を使用して認証されていることを確認してください。 また、アカウントに十分なアクセス許可が付与されていることを確認します。

  1. Azure portal で、メインの検索バーまたは左側のナビゲーションを使用して Event Hubs 名前空間を見つけます。

  2. Event Hubs の概要ページで、左側のメニューから [ アクセス制御 (IAM)] を選択します。

  3. [アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。

  4. 上部のメニューから [ 追加] を選択し、結果のドロップダウン メニューから [ ロールの割り当ての追加] を選択します。

    [ロールの割り当ての追加] が強調表示されている Event Hubs 名前空間リソースの [Azure portal Access Control (IAM)] ページのスクリーンショット。

  5. 検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、Azure Event Hubsデータ送信者Azure Event Hubsデータ レシーバーを検索し、一致する結果を選択し、[次へ] を選択します。

  6. [ アクセスの割り当て先] で 、[ユーザー、グループ、またはサービス プリンシパル] を選択し、[ メンバーの選択] を選択します。

  7. ダイアログで、Microsoft Entraユーザー名 (通常はuser@domainメール アドレス) を検索し、ダイアログの下部にある [選択] を選択します。

  8. [レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。

アクセス ロールの付与の詳細については、「Microsoft Entra ID を使用して Event Hubs リソースへのアクセスを承認する」を参照してください。

2) サインインし、パスワードレス接続を使用するようにアプリ コードを移行する

ローカル開発の場合は、Event Hubs でロールを割り当てたのと同じMicrosoft Entra アカウントで認証されていることを確認します。 認証には、Azure CLI、Visual Studio、Azure PowerShell のほか、IntelliJ などのツールを使用できます。

次のコマンドを使用して、Azure CLI を使用して Azure にサインインします。

az login

次に、次の手順を使用して、パスワードレス接続を使用するように Spring Kafka アプリケーションを更新します。 概念的には似ていますが、各フレームワークでは異なる実装の詳細が使用されます。

  1. プロジェクト内で 、pom.xml ファイルを開き、次の参照を追加します。

    <dependency>
       <groupId>com.azure</groupId>
       <artifactId>azure-identity</artifactId>
       <version>1.6.0</version>
    </dependency>
    
  2. 移行後、次の例に示すように、OAuth2 認証用に AuthenticateCallbackHandlerOAuthBearerToken をプロジェクトに実装します。

    public class KafkaOAuth2AuthenticateCallbackHandler implements AuthenticateCallbackHandler {
    
       private static final Duration ACCESS_TOKEN_REQUEST_BLOCK_TIME = Duration.ofSeconds(30);
       private static final String TOKEN_AUDIENCE_FORMAT = "%s://%s/.default";
    
       private Function<TokenCredential, Mono<OAuthBearerTokenImp>> resolveToken;
       private final TokenCredential credential = new DefaultAzureCredentialBuilder().build();
    
       @Override
       public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
          TokenRequestContext request = buildTokenRequestContext(configs);
          this.resolveToken = tokenCredential -> tokenCredential.getToken(request).map(OAuthBearerTokenImp::new);
       }
    
       private TokenRequestContext buildTokenRequestContext(Map<String, ?> configs) {
          URI uri = buildEventHubsServerUri(configs);
          String tokenAudience = buildTokenAudience(uri);
    
          TokenRequestContext request = new TokenRequestContext();
          request.addScopes(tokenAudience);
          return request;
       }
    
       @SuppressWarnings("unchecked")
       private URI buildEventHubsServerUri(Map<String, ?> configs) {
          String bootstrapServer = Arrays.asList(configs.get(BOOTSTRAP_SERVERS_CONFIG)).get(0).toString();
          bootstrapServer = bootstrapServer.replaceAll("\\[|\\]", "");
          URI uri = URI.create("https://" + bootstrapServer);
          return uri;
       }
    
       private String buildTokenAudience(URI uri) {
          return String.format(TOKEN_AUDIENCE_FORMAT, uri.getScheme(), uri.getHost());
       }
    
       @Override
       public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
          for (Callback callback : callbacks) {
             if (callback instanceof OAuthBearerTokenCallback) {
                OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback;
                this.resolveToken
                        .apply(credential)
                        .doOnNext(oauthCallback::token)
                        .doOnError(throwable -> oauthCallback.error("invalid_grant", throwable.getMessage(), null))
                        .block(ACCESS_TOKEN_REQUEST_BLOCK_TIME);
             } else {
                throw new UnsupportedCallbackException(callback);
             }
          }
       }
    
       @Override
       public void close() {
          // NOOP
       }
    }
    
    public class OAuthBearerTokenImp implements OAuthBearerToken {
        private final AccessToken accessToken;
        private final JWTClaimsSet claims;
    
        public OAuthBearerTokenImp(AccessToken accessToken) {
            this.accessToken = accessToken;
            try {
                claims = JWTParser.parse(accessToken.getToken()).getJWTClaimsSet();
            } catch (ParseException exception) {
                throw new SaslAuthenticationException("Unable to parse the access token", exception);
            }
        }
    
        @Override
        public String value() {
            return accessToken.getToken();
        }
    
        @Override
        public Long startTimeMs() {
            return claims.getIssueTime().getTime();
        }
    
        @Override
        public long lifetimeMs() {
            return claims.getExpirationTime().getTime();
        }
    
        @Override
        public Set<String> scope() {
            // Referring to https://docs.microsoft.com/azure/active-directory/develop/access-tokens#payload-claims, the scp
            // claim is a String, which is presented as a space separated list.
            return Optional.ofNullable(claims.getClaim("scp"))
                    .map(s -> Arrays.stream(((String) s)
                    .split(" "))
                    .collect(Collectors.toSet()))
                    .orElse(null);
        }
    
        @Override
        public String principalName() {
            return (String) claims.getClaim("upn");
        }
    
        public boolean isExpired() {
            return accessToken.isExpired();
        }
    }
    
  3. Kafka プロデューサーまたはコンシューマーを作成するときに、 SASL/OAUTHBEARER メカニズムをサポートするために必要な構成を追加します。 次の例は、移行前と移行後のコードの外観を示しています。 どちらの例でも、 プレースホルダーを <eventhubs-namespace> Event Hubs 名前空間の名前に置き換えます。

    移行前のコードは次の例のようになります。

    Properties properties = new Properties();
    properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<eventhubs-namespace>.servicebus.windows.net:9093");
    properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    properties.put(SaslConfigs.SASL_JAAS_CONFIG,
            String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString));
    return new KafkaProducer<>(properties);
    

    移行後、コードは次の例のようになります。 この例では、 プレースホルダーを <path-to-your-KafkaOAuth2AuthenticateCallbackHandler> 実装した の完全なクラス名に KafkaOAuth2AuthenticateCallbackHandler置き換えます。

    Properties properties = new Properties();
    properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "<eventhubs-namespace>.servicebus.windows.net:9093");
    properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
    properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required");
    properties.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "<path-to-your-KafkaOAuth2AuthenticateCallbackHandler>");
    return new KafkaProducer<>(properties);
    

アプリをローカルで実行する

これらの変更をコードに対して行った後、アプリケーションをローカルで実行します。 互換性のある IDE またはコマンド ライン ツール (Azure CLI、Visual Studio、IntelliJ など) にログインしていることを前提として、新しい構成でローカル資格情報を取得する必要があります。 アプリは、Azure のローカル開発ユーザーに割り当てられたロールを使用して、Azure サービスにローカルから接続できます。

3) Azure ホスティング環境を構成する

アプリケーションがパスワードレス接続を使用するように構成され、ローカルで実行されると、Azure にデプロイされた後、同じコードで Azure サービスに対して認証を行うことができます。 たとえば、マネージド ID が割り当てられている Azure Spring Apps インスタンスにデプロイされたアプリケーションは、Kafka のAzure Event Hubsに接続できます。

このセクションでは、2 つの手順を実行して、アプリケーションをパスワードなしの方法で Azure ホスティング環境で実行できるようにします。

  • Azure ホスティング環境のマネージド ID を割り当てます。
  • マネージド ID にロールを割り当てます。

注意

Azure には Service Connector も用意されています。これは、ホスティング サービスを Event Hubs に接続するのに役立ちます。 Service Connector を使用してホスティング環境を構成すると、Service Connector によって自動的に行われるため、マネージド ID にロールを割り当てる手順を省略できます。 次のセクションでは、2 つの方法で Azure ホスティング環境を構成する方法について説明します。1 つは Service Connector 経由で、もう 1 つは各ホスティング環境を直接構成します。

重要

Service Connector のコマンドには 、Azure CLI 2.41.0 以降が必要です。

Azure ホスティング環境のマネージド ID を割り当てる

次の手順では、さまざまな Web ホスティング サービスにシステム割り当てマネージド ID を割り当てる方法を示します。 このマネージド ID は、先ほど設定したアプリ構成を使用して、他の Azure サービスに安全に接続できます。

  1. Azure App Service インスタンスの [メインの概要] ページで、ナビゲーション ウィンドウから [ID] を選択します。

  2. [ システム割り当て済み ] タブで、 必ず [状態] フィールドを オンに設定します。 システム割り当て ID は Azure によって内部的に管理され、この ID によって管理タスクが自動的に処理されます。 ID の詳細と ID がコードで公開されることはありません。

    [システム割り当て] タブが表示され、[状態] フィールドが強調表示されているApp Service リソースの [ID] ページAzure portalスクリーンショット。

Azure CLI を使用して、Azure ホスティング環境でマネージド ID を割り当てることもできます。

次の例に示すように、az webapp identity assign コマンドを使用して、マネージド ID を Azure App Service インスタンスに割り当てることができます。

export AZURE_MANAGED_IDENTITY_ID=$(az webapp identity assign \
    --resource-group $AZ_RESOURCE_GROUP \
    --name <app-service-name> \
    --query principalId \
    --output tsv)

マネージド ID にロールを割り当てる

次に、Event Hubs 名前空間にアクセスするために作成したマネージド ID にアクセス許可を付与します。 アクセス許可を付与するには、ローカル開発ユーザーと同じように、マネージド ID にロールを割り当てます。

Service Connector を使用してサービスを接続した場合、この手順を完了する必要はありません。 次の必要な構成が処理されました。

  • 接続の作成時にマネージド ID を選択した場合は、アプリ用にシステム割り当てマネージド ID が作成され、Event Hubs にAzure Event Hubsデータ送信者ロールとAzure Event Hubs データ レシーバー ロールが割り当てられます。

  • 接続文字列を使用することを選択した場合、接続文字列はアプリ環境変数として追加されました。

アプリをテストする

これらのコードに変更を加えた後、ホストされているアプリケーションにブラウザーでアクセスします。 アプリは Kafka のAzure Event Hubsに正常に接続できる必要があります。 Azure 環境にロールの割り当てが反映されるまでに数分かかる場合があることに留意してください。 これでローカル環境と運用環境のどちらでも動作するようにアプリケーションが構成されました。開発者がアプリケーション自体でシークレットを管理する必要はありません。

次の手順

このチュートリアルでは、アプリケーションをパスワードレス接続に移行する方法について説明しました。

この記事で説明されている概念の詳細については、次のリソースを参照してください。