次の方法で共有


Azure Cosmos DB を使用したトランザクション送信トレイ パターン

Azure Cosmos DB
Azure Service Bus
Azure Functions

分散システムでの信頼性の高いメッセージングの実装は困難な場合があります。 この記事では、トランザクション 送信トレイ パターンを使用して、信頼性の高いメッセージングとイベントの確実な配信を実現する方法について説明します。これは、 べき等メッセージ処理のサポートの重要な部分です。 これを実現するには、Azure Cosmos DB トランザクション バッチと変更フィードを Azure Service Bus と組み合わせて使用します。

概要

マイクロサービス アーキテクチャは人気を集め、スケーラビリティ、保守容易性、機敏性などの問題 (特に大規模なアプリケーション) を解決する際の約束を示しています。 ただし、このアーキテクチャ パターンでは、データ処理に関する課題も発生します。 分散アプリケーションでは、各サービスは、専用のサービス所有データストアで動作するために必要なデータを個別に保持します。 このようなシナリオをサポートするには、通常、RabbitMQ、Kafka、Azure Service Bus などのメッセージング ソリューションを使用します。このソリューションは、メッセージング バスを介して 1 つのサービスからアプリケーションの他のサービスにデータ (イベント) を分散します。 内部または外部のコンシューマーは、これらのメッセージをサブスクライブし、データが操作されるとすぐに変更の通知を受け取ることができます。

その領域のよく知られた例は、注文システムです。ユーザーが注文を作成する場合、 Ordering サービスは REST エンドポイントを介してクライアント アプリケーションからデータを受信します。 ペイロードを Order オブジェクトの内部表現にマップして、データを検証します。 データベースへのコミットが成功すると、 OrderCreated イベントがメッセージ バスに発行されます。 新しい注文 ( InventoryInvoicing サービスなど) に関心がある他のサービスは、 OrderCreated メッセージをサブスクライブし、処理し、独自のデータベースに格納します。

次の擬似コードは、このプロセスが通常、 Ordering サービスの観点からどのように見えるかを示しています。

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

この方法は、order オブジェクトの保存と対応するイベントの発行の間にエラーが発生するまで適切に機能します。 この時点でイベントの送信は、次のような理由で失敗する可能性があります。

  • ネットワーク エラー
  • メッセージ サービスの停止
  • ホストエラー

エラーが何であれ、結果として、 OrderCreated イベントをメッセージ バスに発行できません。 注文が作成されたことは、他のサービスには通知されません。 Ordering サービスは、実際のビジネス プロセスとは関係のないさまざまな処理を行う必要があります。 オンラインに戻ったらすぐにメッセージ バスに配置する必要があるイベントを追跡する必要があります。 最悪の場合でも、イベントが失われたためにアプリケーションのデータの不整合が発生する可能性があります。

トランザクション 送信トレイ パターンのないイベント処理を示す図。

解決策

このような状況を回避するのに役立つ、 トランザクション 送信トレイ と呼ばれるよく知られたパターンがあります。 これにより、最終的にメッセージ ブローカーにプッシュされる前に、イベントがデータストア (通常はデータベースの送信トレイ テーブル内) に保存されます。 ビジネス オブジェクトと対応するイベントが同じデータベース トランザクション内に保存されている場合、データが失われないことが保証されます。 すべてがコミットされるか、エラーが発生した場合はすべてがロールバックされます。 最終的にイベントを発行するために、別のサービスまたはワーカー プロセスが、未処理のエントリについて Outbox テーブルにクエリを実行し、イベントを発行して、処理済みとしてマークします。 このパターンにより、ビジネス オブジェクトの作成または変更後にイベントが失われなくなります。

メッセージ ブローカーにイベントを発行するためのトランザクション 送信トレイ パターンとリレー サービスを使用したイベント処理を示す図。

このアーキテクチャの Visio ファイルをダウンロードします。

リレーショナル データベースでは、パターンの実装は簡単です。 たとえば、サービスが Entity Framework Core を使用する場合、Entity Framework コンテキストを使用してデータベース トランザクションを作成し、ビジネス オブジェクトとイベントを保存し、トランザクションをコミットするか、ロールバックを実行します。 また、イベントを処理するワーカー サービスは、定期的に送信トレイ テーブルに対して新しいエントリのクエリを実行し、新しく挿入されたイベントをメッセージ バスに発行し、最後にこれらのエントリを処理済みとしてマークします。

実際には、物事は最初に見るほど簡単ではありません。 最も重要なのは、 OrderUpdated イベントが OrderCreated イベントの前に発行されないように、イベントの順序が保持されるようにする必要があります。

Azure Cosmos DB での実装

このセクションでは、Azure Cosmos DB の変更フィードと Service Bus を使用して、さまざまなサービス間で信頼性の高い順番のメッセージングを実現するために、Azure Cosmos DB にトランザクション 送信トレイ パターンを実装する方法について説明します。 Contact オブジェクト (FirstNameLastNameEmailCompany情報など) を管理するサンプル サービスを示します。 コマンドとクエリの責任分離 (CQRS) パターンを使用し、基本的なドメイン駆動設計 (DDD) の概念に従います。 実装のサンプル コードは GitHub にあります。

サンプル サービスの Contact オブジェクトの構造は次のとおりです。

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Contactが作成または更新されるとすぐに、現在の変更に関する情報を含むイベントが生成されます。 特に、ドメイン イベントは次のようになります。

  • ContactCreated。 連絡先が追加されたときに発生します。
  • ContactNameUpdatedFirstNameまたはLastNameが変更されたときに発生します。
  • ContactEmailUpdated。 電子メール アドレスが更新されたときに発生します。
  • ContactCompanyUpdated。 会社のプロパティのいずれかが変更されたときに発生します。

トランザクション バッチ

このパターンを実装するには、 Contact ビジネス オブジェクトと対応するイベントが同じデータベース トランザクションに保存されるようにする必要があります。 Azure Cosmos DB では、トランザクションの動作はリレーショナル データベース システムの場合とは異なります。 トランザクション バッチと呼ばれる Azure Cosmos DB トランザクションは 1 つの論理パーティションで動作するため、アトミック性、整合性、分離、持続性 (ACID) のプロパティが保証されます。 トランザクション バッチ操作で異なるコンテナーまたは論理パーティションに 2 つのドキュメントを保存することはできません。 サンプル サービスの場合、ビジネス オブジェクトとイベントまたはイベントの両方が同じコンテナーと論理パーティションに配置されることを意味します。

コンテキスト、リポジトリ、および UnitOfWork

サンプル実装の中核となるのは、同じトランザクション バッチに保存されているオブジェクトを追跡する コンテナー コンテキスト です。 作成および変更されたオブジェクトの一覧が保持され、1 つの Azure Cosmos DB コンテナーで動作します。 そのインターフェイスは次のようになります。

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

コンテナー コンテキスト コンポーネント内のリストは、 Contact オブジェクトと DomainEvent オブジェクトを追跡します。 どちらも同じコンテナーに格納されます。 つまり、複数の種類のオブジェクトが同じ Azure Cosmos DB コンテナーに格納され、 Type プロパティを使用してビジネス オブジェクトとイベントを区別します。

型ごとに、データ アクセスを定義して実装する専用リポジトリがあります。 Contact リポジトリ インターフェイスには、次のメソッドが用意されています。

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Event リポジトリは似ていますが、ストアに新しいイベントを作成するメソッドが 1 つしかない点が異なります。

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

両方のリポジトリ インターフェイスの実装は、1 つの IContainerContext インスタンスへの依存関係の挿入を介して参照を取得し、両方が同じ Azure Cosmos DB コンテキストで動作することを保証します。

最後のコンポーネントは UnitOfWorkであり、 IContainerContext インスタンスに保持されている変更が Azure Cosmos DB にコミットされます。

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

イベント処理: 作成と公開

Contact オブジェクトが作成、変更、または (ソフト) 削除されるたびに、サービスは対応するイベントを発生させます。 提供されるソリューションの中核となるのは、ドメイン駆動設計 (DDD) と Jimmy Bogard によって提案されたメディエーター パターンの組み合わせです。 ドメイン オブジェクトが変更されたために発生したイベントの一覧を保持し、実際のオブジェクトをデータベースに保存する前にこれらのイベントを発行することを提案します。

変更の一覧はドメイン オブジェクト自体に保持されるため、他のコンポーネントはイベントのチェーンを変更できません。 ドメイン オブジェクト内のイベント (IEvent インスタンス) を維持する動作は、インターフェイス IEventEmitter<IEvent> を介して定義され、抽象 DomainEntity クラスに実装されます。

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Contact オブジェクトはドメイン イベントを発生させます。 Contact エンティティは、DDD の基本的な概念に従い、ドメイン プロパティのセッターをプライベートとして構成します。 クラスにパブリック セッターが存在しません。 代わりに、内部状態を操作するメソッドを提供します。 これらのメソッドでは、特定の変更に対する適切なイベント ( ContactNameUpdatedContactEmailUpdatedなど) を発生させることができます。

連絡先の名前を更新する例を次に示します。 (イベントはメソッドの最後に発生します)。

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return; // if an object is newly created, all modifications will be handled by ContactCreatedEvent

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

変更を追跡する対応する ContactNameUpdatedEventは次のようになります。

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

ここまでは、イベントはドメイン オブジェクトに記録されるだけで、データベースに何も保存されないか、メッセージ ブローカーに発行されます。 推奨事項に従って、ビジネス オブジェクトがデータ ストアに保存される直前にイベントの一覧が処理されます。 この場合、プライベート SaveChangesAsync メソッドで実装される、IContainerContext インスタンスのRaiseDomainEvents メソッドで発生します。 (dObjs は、コンテナー コンテキストの追跡対象エンティティの一覧です)。

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

最後の行では、C# のメディエーター パターンの実装である MediatR パッケージを使用して、アプリケーション内でイベントを発行します。 これは、 ContactNameUpdatedEvent のようなすべてのイベントが MediatR パッケージの INotification インターフェイスを実装するためです。

これらのイベントは、対応するハンドラーによって処理される必要があります。 ここでは、 IEventsRepository 実装が行われます。 NameUpdated イベント ハンドラーのサンプルを次に示します。

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

IEventRepository インスタンスは、コンストラクターを介してハンドラー クラスに挿入されます。 サービスで ContactNameUpdatedEvent が発行されるとすぐに、 Handle メソッドが呼び出され、イベント リポジトリ インスタンスを使用して通知オブジェクトが作成されます。 その通知オブジェクトは、 IContainerContext オブジェクト内の追跡対象オブジェクトの一覧に挿入され、同じトランザクション バッチに保存されているオブジェクトを Azure Cosmos DB に結合します。

これまで、コンテナー コンテキストは処理するオブジェクトを認識しています。 追跡対象のオブジェクトを最終的に Azure Cosmos DB に保持するために、 IContainerContext 実装はトランザクション バッチを作成し、関連するすべてのオブジェクトを追加して、データベースに対して操作を実行します。 記述されているプロセスは、SaveInTransactionalBatchAsync メソッドによって呼び出されるSaveChangesAsync メソッドで処理されます。

トランザクション バッチを作成して実行するために必要な実装の重要な部分を次に示します。

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects);

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

これまでのプロセスのしくみの概要を次に示します (連絡先オブジェクトの名前を更新するため)。

  1. クライアントは、連絡先の名前を更新する必要があります。 SetName メソッドが連絡先オブジェクトで呼び出され、プロパティが更新されます。
  2. ContactNameUpdated イベントは、ドメイン オブジェクト内のイベントの一覧に追加されます。
  3. 連絡先リポジトリの Update メソッドが呼び出され、コンテナー コンテキストにドメイン オブジェクトが追加されます。 オブジェクトが追跡されるようになりました。
  4. CommitAsyncUnitOfWork インスタンスで呼び出され、コンテナー コンテキストで SaveChangesAsync が呼び出されます。
  5. SaveChangesAsync内では、ドメイン オブジェクトの一覧内のすべてのイベントは、MediatR インスタンスによって発行され、イベント リポジトリを介して同じコンテナー コンテキストに追加されます。
  6. SaveChangesAsyncでは、TransactionalBatchが作成されます。 連絡先オブジェクトとイベントの両方が保持されます。
  7. TransactionalBatchが実行され、データが Azure Cosmos DB にコミットされます。
  8. SaveChangesAsync 正常に戻 CommitAsync

固執

前のコード スニペットでわかるように、Azure Cosmos DB に保存されているすべてのオブジェクトは、 DataObject インスタンスにラップされます。 このオブジェクトには、次の共通プロパティがあります。

  • ID
  • PartitionKey
  • Type
  • StateCreatedと同様に、Updatedは Azure Cosmos DB に保持されません。
  • Etagオプティミスティック ロックの場合。
  • TTLTime To Live プロパティを使用して、古いドキュメントを自動的にクリーンアップします。
  • Data。 汎用データ オブジェクト。

これらのプロパティは、 IDataObject と呼ばれる汎用インターフェイスで定義され、リポジトリとコンテナー コンテキストによって使用されます。


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

DataObject インスタンスにラップされ、データベースに保存されたオブジェクトは、次のサンプルのようになります (ContactContactNameUpdatedEvent)。

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

ContactドキュメントとContactNameUpdatedEvent (型domainEvent) ドキュメントが同じパーティション キーを持ち、両方のドキュメントが同じ論理パーティションに保持されることがわかります。

変更フィードの処理

イベントのストリームを読み取ってメッセージ ブローカーに送信するために、サービスは Azure Cosmos DB 変更フィードを使用します。

変更フィードは、コンテナー内の変更の永続的なログです。 バックグラウンドで動作し、変更を追跡します。 1 つの論理パーティション内では、変更の順序が保証されます。 変更フィードを読み取る最も便利な方法は、 Azure Cosmos DB トリガーで Azure 関数を使用することです。 もう 1 つのオプションは、 変更フィード プロセッサ ライブラリを使用することです。 これにより、( IHostedService インターフェイスを介して) 変更フィード処理をバックグラウンド サービスとして Web API に統合できます。 このサンプルでは、抽象クラス BackgroundService を実装する単純なコンソール アプリケーションを使用して、.NET Core アプリケーションで実行時間の長いバックグラウンド タスクをホストします。

Azure Cosmos DB の変更フィードから変更を受信するには、 ChangeFeedProcessor オブジェクトをインスタンス化し、メッセージ処理用のハンドラー メソッドを登録して、変更のリッスンを開始する必要があります。

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

ハンドラー メソッド (ここでHandleChangesAsync ) は、メッセージを処理します。 このサンプルでは、スケーラビリティのためにパーティション分割され、 重複除去機能が有効になっている Service Bus トピックにイベントが発行されます。 その後、 Contact オブジェクトへの変更に関心があるサービスは、その Service Bus トピックをサブスクライブし、独自のコンテキストの変更を受信して処理できます。

生成される Service Bus メッセージには、 SessionId プロパティがあります。 Service Bus でセッションを使用する場合は、メッセージの順序が保持されることを保証します (先入れ先出し (FIFO))。 このユース ケースでは、順序を保持する必要があります。

変更フィードからのメッセージを処理するスニペットを次に示します。

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

エラー処理

変更の処理中にエラーが発生した場合、変更フィード ライブラリは、最後のバッチが正常に処理された位置でメッセージの読み取りを再開します。 たとえば、アプリケーションが 10,000 件のメッセージを正常に処理し、バッチ 10,001 から 10,025 で作業していて、エラーが発生した場合は、再開して 10,001 の位置で作業を再開できます。 ライブラリは、Azure Cosmos DB の Leases コンテナーに保存された情報によって処理された内容を自動的に追跡します。

サービスが、Service Bus に再処理されるメッセージの一部を既に送信している可能性があります。 通常、このシナリオではメッセージ処理が重複することになります。 前述のように、Service Bus には、このシナリオで有効にする必要がある重複メッセージ検出の機能があります。 サービスは、メッセージのアプリケーション制御 MessageId プロパティに基づいて、Service Bus トピック (またはキュー) にメッセージが既に追加されているかどうかを確認します。 このプロパティは、イベント ドキュメントの ID に設定されます。 同じメッセージが Service Bus に再度送信された場合、サービスはそれを無視して削除します。

ハウスキープ処理

一般的なトランザクション 送信トレイの実装では、サービスは処理されたイベントを更新し、メッセージが正常に発行されたことを示す Processed プロパティを trueに設定します。 この動作は、ハンドラー メソッドで手動で実装できます。 現在のシナリオでは、このようなプロセスは必要ありません。 Azure Cosmos DB では、変更フィード ( Leases コンテナーと組み合わせて) を使用して処理されたイベントを追跡します。

最後の手順として、コンテナーからイベントを削除して、最新のレコード/ドキュメントのみを保持する必要がある場合があります。 定期的にクリーンアップを行うために、実装は Azure Cosmos DB の別の機能である Time To Live (TTL) をドキュメントに適用します。 Azure Cosmos DB では、ドキュメントに追加できる TTL プロパティ (秒単位) に基づいて、ドキュメントを自動的に削除できます。 サービスは、 TTL プロパティを持つドキュメントのコンテナーを常にチェックします。 ドキュメントの有効期限が切れるとすぐに、Azure Cosmos DB によってデータベースから削除されます。

すべてのコンポーネントが期待どおりに動作すると、数秒でイベントが処理され、迅速に発行されます。 Azure Cosmos DB でエラーが発生した場合、ビジネス オブジェクトと対応するイベントの両方をデータベースに保存できないため、イベントはメッセージ バスに送信されません。 唯一考慮すべき点は、バックグラウンド ワーカー (変更フィード プロセッサ) またはサービス バスが使用できない場合に、TTL ドキュメントに適切なDomainEvent値を設定することです。 運用環境では、複数日の期間を選択することをお勧めします。 たとえば、10 日です。 その後、関係するすべてのコンポーネントは、アプリケーション内で変更を処理/発行するのに十分な時間を持ちます。

概要

トランザクション 送信トレイ パターンは、分散システムでドメイン イベントを確実に発行する問題を解決します。 ビジネス オブジェクトの状態とそのイベントを同じトランザクション バッチでコミットし、バックグラウンド プロセッサをメッセージ リレーとして使用することで、内部または外部の他のサービスが最終的に依存する情報を受け取るようにします。 このサンプルは、トランザクション 送信トレイ パターンの従来の実装ではありません。 Azure Cosmos DB の変更フィードや Time To Live などの機能を使用して、シンプルでクリーンな状態を維持します。

このシナリオで使用される Azure コンポーネントの概要を次に示します。

Azure Cosmos DB と Azure Service Bus を使用してトランザクション 送信トレイを実装する Azure コンポーネントを示す図。

このアーキテクチャの Visio ファイルをダウンロードします。

このソリューションの利点は次のとおりです。

  • 信頼性の高いメッセージングとイベントの確実な配信。
  • Service Bus を介したイベントとメッセージの重複除去の順序が保持されます。
  • イベント ドキュメントの正常な処理を示す追加の Processed プロパティを維持する必要はありません。
  • Time to Live (TTL) による Azure Cosmos DB からのイベントの削除。 このプロセスでは、ユーザー/アプリケーション要求の処理に必要な要求ユニットは使用されません。 代わりに、バックグラウンド タスクで "残った" 要求ユニットを使用します。
  • ChangeFeedProcessor (または Azure 関数) を使用したメッセージのエラー防止処理。
  • 省略可能: 複数の変更フィード プロセッサ。それぞれが変更フィード内で独自のポインターを維持します。

考慮事項

この記事で説明するサンプル アプリケーションでは、Azure Cosmos DB と Service Bus を使用して Azure にトランザクション 送信トレイ パターンを実装する方法を示します。 NoSQL データベースを使用する他の方法もあります。 ビジネス オブジェクトとイベントが確実にデータベースに保存されるように、イベントの一覧をビジネス オブジェクト ドキュメントに埋め込むことができます。 この方法の欠点は、クリーンアップ プロセスでイベントを含む各ドキュメントを更新する必要があるということです。 これは、TTL を使用する場合と比較して、特に要求ユニットのコストに関しては理想的ではありません。

運用環境に対応したコードで提供されているサンプル コードは考慮しないでください。 マルチスレッド、特に DomainEntity クラスでのイベントの処理方法、および CosmosContainerContext 実装でのオブジェクトの追跡方法に関して、いくつかの制限があります。 独自の実装の開始点として使用します。 または、 NServiceBusMassTransit などの機能が既に組み込まれている既存のライブラリを使用することを検討してください。

このシナリオをデプロイする

ソース コード、デプロイ ファイル、およびこのシナリオをテストする手順については、GitHub: https://github.com/mspnp/transactional-outbox-patternを参照してください。

貢献者

この記事は Microsoft によって管理されています。 当初の寄稿者は以下のとおりです。

主要な著者

非公開の LinkedIn プロファイルを表示するには、LinkedIn にサインインします。

次のステップ

詳細については、次の記事を参照してください。