ボットのカスタム ストレージの実装

この記事の対象: SDK v4

ボットの対話は 3 つの領域に分類されます (Azure AI Bot Service とのアクティビティの交換、メモリー ストアによるボットおよびダイアログの状態の読み込みと保存、バックエンド サービスとの統合)。

Interaction diagram outlining relationship between the Azure AI Bot Service, a bot, a memory store, and other services.

この記事では、Azure AI Bot Service とボットのメモリ状態およびストレージの間でセマンティクスを拡張する方法について説明します。

Note

Bot Framework JavaScript SDK、C#、Python SDK は引き続きサポートされますが、Java SDK については、最終的な長期サポートは 2023 年 11 月に終了する予定です。 このリポジトリ内の重要なセキュリティとバグの修正のみが行われます。

Java SDK を使用して構築された既存のボットは引き続き機能します。

新しいボットの構築については、Power Virtual Agents の使用を検討し、適切なチャットボット ソリューションの選択についてお読みください。

詳細については、「The future of bot building」をご覧ください。

前提条件

この記事では、サンプルの C# バージョンを中心に扱います。

背景

Bot Framework SDK には、ボットの状態とメモリ ストレージの既定の実装が含まれています。 この実装は、多くのサンプルで示されているように、この部分を数行の初期化コードと共に使用するアプリケーションのニーズに合っています。

SDK はフレームワークであり、動作が固定されたアプリケーションではありません。 言い換えると、フレームワーク内の多くのメカニズムの実装は既定の実装であり、可能な唯一の実装ではありません。 フレームワークでは、Azure AI Bot Service とのアクティビティの交換と、ボットの状態の読み込みと保存の関係は規定されません。

この記事では、既定の状態とストレージの実装のセマンティクスがアプリケーションで十分に機能しない場合に、セマンティクスを変更する 1 つの方法について説明します。 スケールアウト サンプルは、既定のセマンティクスとは異なるセマンティクスを持つ状態とストレージの代替実装を提供します。 この代替代替方法は、フレームワークでも同様に適切に機能します。 シナリオによっては、この代替ソリューションが開発しているアプリケーションに適している場合があります。

既定のアダプターとストレージ プロバイダーの動作

既定の実装では、ボットはアクティビティを受信すると、会話に対応する状態を読み込みます。 その後、この状態および受信したアクティビティを使用してダイアログ ロジックを実行します。 ダイアログを実行するプロセスでは、1 つ以上の送信アクティビティが作成され、即座に送信されます。 ダイアログの処理が完了すると、ボットは更新された状態を保存し、古い状態を上書きします。

Sequence diagram showing the default behavior of a bot and its memory store.

ただし、この動作にはいくつかの問題が発生する可能性があります。

  • 何らかの理由で保存操作が失敗した場合、状態はユーザーがチャネルで認識しているものとは暗黙的に同期しなくなります。 ユーザーはボットからの応答を確認し、状態が進行したと考えている場合も、実際には進んでいません。 このエラーは、状態の更新が成功したが、ユーザーが応答メッセージを受信しなかった場合よりも不都合である場合があります。

    このような状態エラーは、会話の設計に影響を与える可能性があります。 たとえば、ダイアログには、ユーザーとの追加の、冗長ともなりうる確認交換が必要になる場合があります。

  • 実装が複数のノードにスケールアウトされてデプロイされた場合、状態が誤って上書きされる可能性があります。 ダイアログが確認メッセージを含むチャネルにアクティビティを送信している可能性があるため、このエラーは混乱を招く場合があります。

    ピザ注文ボットについて考えてみましょう。ここでは、ボットがユーザーにトッピングの選択を求め、ユーザーは 2 つメッセージを間隔を空けずに送信します。1 つはマッシュルームの追加、もう 1 つはチーズの追加です。 スケールアウト シナリオでは、ボットの複数のインスタンスがアクティブになり、2 つのユーザー メッセージが別々のマシン上の 2 つの個別のインスタンスによって処理される可能性があります。 このような競合は、競合状態 と呼ばれるものであり、あるマシンが別のマシンによって書き込まれた状態を上書きする場合があります。 しかし、ここでは、応答が既に送信されているため、ユーザーはマッシュルームとチーズの両方が追加されたという確認を受け取っています。 残念ながら、ピザが届いても、追加されたのはマッシュルームまたはチーズだけであり、両方ではありません。

オプティミスティック ロック

スケールアウト サンプルでは、状態に関するいくつかのロックが導入されています。 このサンプルでは楽観ロックを実装しています。これにより、各インスタンスは、1 つだけ実行されているように実行され、その後にコンカレンシー違反のチェックが行われます。 このロックは複雑に聞こえるかもしれませんが、既知のソリューションが存在し、Bot Framework でクラウド ストレージ テクノロジと適切な拡張点を使用できます。

サンプルでは、エンティティ タグ (ETag) ヘッダーに基づく標準の HTTP メカニズムを使用します。 このメカニズムを理解しておくことは、この後のコードを理解する上で不可欠となります。 次のダイアグラムはこのシーケンスを示しています。

Sequence diagram showing a race condition, with the second update failing.

このダイアグラムは、あるリソースの更新を実行する 2 つのクライアントのケースを示しています。

  1. クライアントが GET 要求を発行し、サーバーからリソースが返されるときに、サーバーによって ETag ヘッダーが添付されます。

    ETag ヘッダーは、リソースの状態を表す非透過的な値です。 リソースが変更されると、サーバーはリソースの ETag を更新します。

  2. クライアントは、状態変更を保持する必要がある場合、If-Match 前提条件ヘッダーに ETag 値を含む POST 要求をサーバーに発行します。

  3. 要求の ETag 値がサーバーのものと一致しない場合、前提条件チェックは 412 (前提条件の失敗) 応答で失敗します。

    このエラーは、サーバー上の現在の値が、クライアントが操作していた元の値と一致しなくなったことを示します。

  4. クライアントが前提条件の失敗応答を受け取った場合、通常、クライアントはリソースの新しい値を取得し、必要な更新を適用して、リソースの更新を再び送信しようとします。

    他のクライアントがリソースを更新していない場合、この 2 番目の POST 要求は成功します。 それ以外の場合、クライアントは再試行します。

このプロセスは、楽観と呼ばれます。クライアントがリソースを取得すると、その処理を進めますが、リソースそのものは ロック されておらず、他のクライアントはリソースに制限なしでアクセスできるためです。 リソースの状態に対するクライアント間の競合は、処理が完了するまで確認されません。 一般に、分散システムでは、この戦略が対照的なアプローチである悲観アプローチよりも適切です。

説明されている楽観ロック メカニズムでは、プログラム ロジックを安全に再試行できることを前提としています。 理想的な状況は、これらのサービス要求がべき等であることです。 コンピューター サイエンスでは、べき等操作とは、同じ入力パラメーターを使用して複数回呼び出されても付加的影響がない操作のことです。 GET、PUT、および DELETE 要求を実装する純粋な HTTP REST サービスは、多くの場合べき等です。 サービス要求で追加の効果が生成されない場合は、再試行戦略の一環として、要求を安全に再実行できます。

この記事のスケールアウト サンプルと他の部分は、ボットが使用するバックエンド サービスが全てべき等の HTTP REST サービスであることを前提としています。

送信アクティビティのバッファー処理

アクティビティの送信はべき等操作ではありません。 アクティビティは多くの場合、ユーザーに情報を中継するメッセージであり、同じメッセージを 2 回以上繰り返すことは、混乱や誤解を招く可能性があります。

楽観ロックは、ボット ロジックを複数回再実行する必要がある可能性があることを意味します。 特定のアクティビティを複数回送信しないようにするには、状態更新操作が成功するまで待ってから、アクティビティをユーザーに送信します。 ボット ロジックは、以下のダイアグラムと類似したものになります。

Sequence diagram with messages being sent after dialog state is saved.

ダイアログの実行に再試行ループを組み込むと、保存操作で前提条件エラーが発生した場合に、次の動作が発生します。

Sequence diagram with messages being sent after a retry attempt succeeds.

このメカニズムを適用すると、前の例で示したピザ ボットは、注文に追加されたトッピングについて、誤った肯定的確認応答を送信しなくなります。 ボットが複数のマシンにデプロイされている場合でも、楽観ロック スキームは状態の更新を効果的にシリアル化します。 ピザ ボットは、アイテムの追加による確認応答が完全な状況を正確に反映することもできるようになります。 たとえば、ユーザーが "チーズ" と "マッシュルーム" をすばやく入力し、これらのメッセージがボットの 2 つの異なるインスタンスによって処理される場合、最後に完了するインスタンスには、応答の一部として "チーズとマッシュルーム入りピザ" を含めることができます。

この新しいカスタム ストレージ ソリューションでは、SDK の既定の実装では行われない 3 つのことを行います。

  1. ETag を使用して競合を検出します。
  2. ETag エラーが検出されると、処理が再試行されます。
  3. 状態が正常に保存されるまで送信アクティビティの送信を待機します。

この記事の残りの部分では、この 3 つの部分の実装について説明します。

ETag サポートの実装

最初に、ETag サポートを含む新しいストアのインターフェイスを定義します。 このインターフェイスは、ASP.NET で依存関係の挿入メカニズムを使用するのに役立ちます。 インターフェイスから始めることで、単体テストと運用環境用に個別のバージョンを実装できます。 たとえば、単体テスト バージョンはメモリ内で実行され、ネットワーク接続は必要ありません。

インターフェイスは、Load メソッドと Save メソッドで構成されます。 どちらの方法でも、キー パラメーターを使用して、ストレージからの読み込み、ストレージへの保存の状態を識別します。

  • Load は状態値と関連する ETag を返します。
  • Save には、状態値と関連する ETag のパラメーターがあり、操作が成功したかどうかを示すブール値を返します。 戻り値は、一般的なエラー インジケーターとしてではなく、前提条件エラーの特定のインジケーターとして機能します。 リターン コードの確認が、再試行ループのロジックの一部になります。

ストレージの実装を広く適用できるようにするには、シリアル化の要件を設定しないようにします。 ただし、多くの最新のストレージ サービスでは、コンテンツ タイプとして JSON がサポートされています。 C# では、JObject 型を使用して JSON オブジェクトを表すことができます。 JavaScript または TypeScript では、JSON が通常のネイティブ オブジェクトです。

顧客向けのインターフェイス定義は次のとおりです。

IStore.cs

public interface IStore
{
    Task<(JObject content, string etag)> LoadAsync(string key);

    Task<bool> SaveAsync(string key, JObject content, string etag);
}

Azure Blob Storage の実装を次に示します。

BlobStore.cs

public class BlobStore : IStore
{
    private readonly CloudBlobContainer _container;

    public BlobStore(string accountName, string accountKey, string containerName)
    {
        if (string.IsNullOrWhiteSpace(accountName))
        {
            throw new ArgumentException(nameof(accountName));
        }

        if (string.IsNullOrWhiteSpace(accountKey))
        {
            throw new ArgumentException(nameof(accountKey));
        }

        if (string.IsNullOrWhiteSpace(containerName))
        {
            throw new ArgumentException(nameof(containerName));
        }

        var storageCredentials = new StorageCredentials(accountName, accountKey);
        var cloudStorageAccount = new CloudStorageAccount(storageCredentials, useHttps: true);
        var client = cloudStorageAccount.CreateCloudBlobClient();
        _container = client.GetContainerReference(containerName);
    }

    public async Task<(JObject content, string etag)> LoadAsync(string key)
    {
        if (string.IsNullOrWhiteSpace(key))
        {
            throw new ArgumentException(nameof(key));
        }

        var blob = _container.GetBlockBlobReference(key);
        try
        {
            var content = await blob.DownloadTextAsync();
            var obj = JObject.Parse(content);
            var etag = blob.Properties.ETag;
            return (obj, etag);
        }
        catch (StorageException e)
            when (e.RequestInformation.HttpStatusCode == (int)HttpStatusCode.NotFound)
        {
            return (new JObject(), null);
        }
    }

    public async Task<bool> SaveAsync(string key, JObject obj, string etag)
    {
        if (string.IsNullOrWhiteSpace(key))
        {
            throw new ArgumentException(nameof(key));
        }

        if (obj == null)
        {
            throw new ArgumentNullException(nameof(obj));
        }

        var blob = _container.GetBlockBlobReference(key);
        blob.Properties.ContentType = "application/json";
        var content = obj.ToString();
        if (etag != null)
        {
            try
            {
                await blob.UploadTextAsync(content, Encoding.UTF8, new AccessCondition { IfMatchETag = etag }, new BlobRequestOptions(), new OperationContext());
            }
            catch (StorageException e)
                when (e.RequestInformation.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
            {
                return false;
            }
        }
        else
        {
            await blob.UploadTextAsync(content);
        }

        return true;
    }
}

Azure Blob Storage は、多くの作業を行います。 各メソッドは、呼び出し元のコードの期待を満たすために、特定の例外をチェックします。

  • LoadAsync メソッドは、状態コードが見つからないストレージ例外に応答して、null 値を返します。
  • SaveAsync メソッドは、前提条件に失敗したコードを含むストレージ例外に応答して、false を返します。

再試行ループを実装する

再試行ループの設計では、シーケンス図に示されている動作が実装されます。

  1. アクティビティの受信時に、その会話状態のキーを作成します。

    アクティビティと会話状態の関係は、既定の実装のカスタム ストレージの場合と同じです。 そのため、既定の状態の実装と同じ方法でキーを構築できます。

  2. 会話状態の読み込みを試みます。

  3. ボットのダイアログを実行し、送信する送信アクティビティをキャプチャします。

  4. 会話状態の保存を試みます。

    • 成功したら、送信アクティビティを送信して終了します。

    • 失敗した場合は、このプロセスを会話状態を読み込むためのステップから繰り返します。

      会話状態の新しい読み込みでは、新しい現在の ETag と会話状態が取得されます。 ダイアログが再実行され、状態の保存ステップが成功する可能性があります。

メッセージ アクティビティ ハンドラーの実装を次に示します。

ScaleoutBot.cs

protected override async Task OnMessageActivityAsync(ITurnContext<IMessageActivity> turnContext, CancellationToken cancellationToken)
{
    // Create the storage key for this conversation.
    var key = $"{turnContext.Activity.ChannelId}/conversations/{turnContext.Activity.Conversation?.Id}";

    // The execution sits in a loop because there might be a retry if the save operation fails.
    while (true)
    {
        // Load any existing state associated with this key
        var (oldState, etag) = await _store.LoadAsync(key);

        // Run the dialog system with the old state and inbound activity, the result is a new state and outbound activities.
        var (activities, newState) = await DialogHost.RunAsync(_dialog, turnContext.Activity, oldState, cancellationToken);

        // Save the updated state associated with this key.
        var success = await _store.SaveAsync(key, newState, etag);

        // Following a successful save, send any outbound Activities, otherwise retry everything.
        if (success)
        {
            if (activities.Any())
            {
                // This is an actual send on the TurnContext we were given and so will actual do a send this time.
                await turnContext.SendActivitiesAsync(activities, cancellationToken);
            }

            break;
        }
    }
}

Note

このサンプルでは、ダイアログの実行を関数呼び出しとして実装します。 より高度なアプローチとして、インターフェイスを定義し、依存関係の挿入を使用する方法があります。 ただし、この例では、静的関数は、この楽観ロック アプローチの機能的な性質を強調しています。 一般に、コードの重要な部分を機能的な方法で実装すると、ネットワークで正常に動作する可能性が向上します。

送信アクティビティ バッファーを実装する

次の要件は、保存操作が成功するまで送信アクティビティをバッファー処理することです。これには、カスタム アダプターの実装が必要です。 カスタム SendActivitiesAsync メソッドは、アクティビティを使用するために送信するのではなく、アクティビティをリストに追加する必要があります。 ダイアログ コードを変更する必要はありません。

  • この特定のシナリオでは、アクティビティ更新アクティビティ削除の操作はサポートされておらず、関連付けられているメソッドは「実装されていない」例外をスローします。
  • アクティビティ送信操作からの戻り値は、一部のチャンネルが以前に送信されたメッセージを変更または削除するために使用します。たとえば、チャネル内に表示されているカードのボタンを無効にする場合です。 これらのメッセージ交換は、特に状態が必要な場合複雑化する場合がありますが、それらについてはこの記事では取り扱いません。
  • ダイアログは、このカスタム アダプターを作成して使用するため、アクティビティをバッファーできます。
  • ボットのターン ハンドラーは、より標準的な AdapterWithErrorHandler を使用してアクティビティをユーザーに送信します。

カスタム アダプターの実装を次に示します。

DialogHostAdapter.cs

public class DialogHostAdapter : BotAdapter
{
    private List<Activity> _response = new List<Activity>();

    public IEnumerable<Activity> Activities => _response;

    public override Task<ResourceResponse[]> SendActivitiesAsync(ITurnContext turnContext, Activity[] activities, CancellationToken cancellationToken)
    {
        foreach (var activity in activities)
        {
            _response.Add(activity);
        }

        return Task.FromResult(new ResourceResponse[0]);
    }

    #region Not Implemented
    public override Task DeleteActivityAsync(ITurnContext turnContext, ConversationReference reference, CancellationToken cancellationToken)
    {
        throw new NotImplementedException();
    }

    public override Task<ResourceResponse> UpdateActivityAsync(ITurnContext turnContext, Activity activity, CancellationToken cancellationToken)
    {
        throw new NotImplementedException();
    }
    #endregion
}

ボットでカスタム ストレージを使用する

最後の手順では、既存のフレームワーク クラスとメソッドでこれらのカスタム クラスとメソッドを使用します。

  • メイン再試行ループはボットの ActivityHandler.OnMessageActivityAsync メソッドの一部になり、依存関係の挿入によるカスタム ストレージが含まれます。
  • ダイアログ ホスティング コードは、静的 DialogHost メソッドを公開する RunAsync クラスに追加されます。 ダイアログ ホスト:
    • 受信アクティビティと過去の状態を取得した後、結果のアクティビティおよび新しい状態を返します。
    • カスタム アダプターを作成し、それ以外の場合は SDK と同じ方法でダイアログを実行します。
    • ダイアログの状態をダイアログ システムに渡す shim であるカスタム状態プロパティ アクセサーを作成します。 アクセサーは、参照セマンティクスを使用して、アクセサー ハンドルをダイアログ システムに渡します。

ヒント

JSON のシリアル化は、さまざまな実装が異なる方法でシリアル化できるように、ホスティング コードにインラインで追加され、プラグ可能なストレージ レイヤーの外部に保持されます。

ダイアログ ホストの実装を次に示します。

DialogHost.cs

public static class DialogHost
{
    // The serializer to use. Moving the serialization to this layer will make the storage layer more pluggable.
    private static readonly JsonSerializer StateJsonSerializer = new JsonSerializer() { TypeNameHandling = TypeNameHandling.All };

    /// <summary>
    /// A function to run a dialog while buffering the outbound Activities.
    /// </summary>
    /// <param name="dialog">THe dialog to run.</param>
    /// <param name="activity">The inbound Activity to run it with.</param>
    /// <param name="oldState">Th eexisting or old state.</param>
    /// <returns>An array of Activities 'sent' from the dialog as it executed. And the updated or new state.</returns>
    public static async Task<(Activity[], JObject)> RunAsync(Dialog dialog, IMessageActivity activity, JObject oldState, CancellationToken cancellationToken)
    {
        // A custom adapter and corresponding TurnContext that buffers any messages sent.
        var adapter = new DialogHostAdapter();
        var turnContext = new TurnContext(adapter, (Activity)activity);

        // Run the dialog using this TurnContext with the existing state.
        var newState = await RunTurnAsync(dialog, turnContext, oldState, cancellationToken);

        // The result is a set of activities to send and a replacement state.
        return (adapter.Activities.ToArray(), newState);
    }

    /// <summary>
    /// Execute the turn of the bot. The functionality here closely resembles that which is found in the
    /// IBot.OnTurnAsync method in an implementation that is using the regular BotFrameworkAdapter.
    /// Also here in this example the focus is explicitly on Dialogs but the pattern could be adapted
    /// to other conversation modeling abstractions.
    /// </summary>
    /// <param name="dialog">The dialog to be run.</param>
    /// <param name="turnContext">The ITurnContext instance to use. Note this is not the one passed into the IBot OnTurnAsync.</param>
    /// <param name="state">The existing or old state of the dialog.</param>
    /// <returns>The updated or new state of the dialog.</returns>
    private static async Task<JObject> RunTurnAsync(Dialog dialog, ITurnContext turnContext, JObject state, CancellationToken cancellationToken)
    {
        // If we have some state, deserialize it. (This mimics the shape produced by BotState.cs.)
        var dialogStateProperty = state?[nameof(DialogState)];
        var dialogState = dialogStateProperty?.ToObject<DialogState>(StateJsonSerializer);

        // A custom accessor is used to pass a handle on the state to the dialog system.
        var accessor = new RefAccessor<DialogState>(dialogState);

        // Run the dialog.
        await dialog.RunAsync(turnContext, accessor, cancellationToken);

        // Serialize the result (available as Value on the accessor), and put its value back into a new JObject.
        return new JObject { { nameof(DialogState), JObject.FromObject(accessor.Value, StateJsonSerializer) } };
    }
}

最後に、カスタム状態プロパティ アクセサーの実装を次に示します。

RefAccessor.cs

public class RefAccessor<T> : IStatePropertyAccessor<T>
    where T : class
{
    public RefAccessor(T value)
    {
        Value = value;
    }

    public T Value { get; private set; }

    public string Name => nameof(T);

    public Task<T> GetAsync(ITurnContext turnContext, Func<T> defaultValueFactory = null, CancellationToken cancellationToken = default(CancellationToken))
    {
        if (Value == null)
        {
            if (defaultValueFactory == null)
            {
                throw new KeyNotFoundException();
            }

            Value = defaultValueFactory();
        }

        return Task.FromResult(Value);
    }

    #region Not Implemented
    public Task DeleteAsync(ITurnContext turnContext, CancellationToken cancellationToken = default(CancellationToken))
    {
        throw new NotImplementedException();
    }

    public Task SetAsync(ITurnContext turnContext, T value, CancellationToken cancellationToken = default(CancellationToken))
    {
        throw new NotImplementedException();
    }
    #endregion
}

追加情報

C#PythonJava の スケールアウト サンプルは、GitHub の Bot Framework サンプル リポジトリから入手できます。