使用 Azure Cosmos DB 的交易外寄箱模式

Azure Cosmos DB
Azure 服務匯流排
Azure Functions

在分散式系統中實作可靠的傳訊可能會很困難。 本文說明如何使用交易式寄件匣模式進行可靠的傳訊和保證事件傳遞,這是支援 等冪訊息處理的重要部分。 若要達成此目的,您將使用 Azure Cosmos DB 交易式批次和變更摘要與 Azure 服務匯流排。

概觀

微服務架構越來越受歡迎,並承諾解決延展性、可維護性和靈活度等問題,特別是在大型應用程式中。 但是,在數據處理方面,這種架構模式也會帶來挑戰。 在分散式應用程式中,每個服務都會獨立維護在專用服務擁有的數據存放區中運作所需的數據。 為了支援這類案例,您通常會使用RabbitMQ、Kafka或 Azure 服務匯流排等傳訊解決方案,透過傳訊總線將數據(事件)從一個服務散發至應用程式的其他服務。 然後,內部或外部取用者可以訂閱這些訊息,並在操作數據后立即收到變更的通知。

該區域中的已知範例是訂購系統:當使用者想要建立訂單時, Ordering 服務會透過 REST 端點從用戶端應用程式接收數據。 它會將承載對應至對象的內部表示 Order 法,以驗證數據。 成功認可資料庫之後,它會將 OrderCreated 事件發佈至訊息總線。 對新訂單感興趣的任何其他服務(例如 InventoryInvoicing 服務),都會訂閱 OrderCreated 訊息、處理訊息,並將其儲存在自己的資料庫中。

下列虛擬程式代碼顯示此程式通常從 Ordering 服務的觀點來看:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

在儲存順序對象和發佈對應事件之間發生錯誤之前,此方法會正常運作。 傳送事件時可能會因為許多原因而失敗:

  • 網路錯誤
  • 訊息服務中斷
  • 主機失敗

無論錯誤為何,結果就是 OrderCreated 事件無法發佈至訊息總線。 其他服務將不會收到已建立訂單的通知。 服務 Ordering 現在必須處理與實際商務程序無關的各種事項。 它必須追蹤在消息總線恢復在線時仍需要放在訊息總線上的事件。 即使是最糟糕的情況也會發生:應用程式中的數據不一致,因為遺失事件。

Diagram that shows event handling without the Transactional Outbox pattern.

解決方案

有一種稱為 交易式寄件箱 的已知模式,可協助您避免這些情況。 在事件最終推送至訊息代理程式之前,可確保事件會儲存在數據存放區中(通常是在資料庫中的 Outbox 數據表中)。 如果商務對象和對應的事件儲存在相同的資料庫交易內,則保證不會遺失任何數據。 所有項目都會認可,或者如果發生錯誤,所有專案都會回復。 為了最終發佈事件,不同的服務或背景工作進程會查詢 Outbox 數據表中未處理的專案、發佈事件,並將其標示為已處理。 此模式可確保建立或修改商務對象之後,不會遺失事件。

Diagram that shows event handling with the Transactional Outbox pattern and a relay service for publishing events to the message broker.

下載此架構的 Visio 檔案

在關係資料庫中,模式的實作很簡單。 例如,如果服務使用 Entity Framework Core,它會使用 Entity Framework 內容來建立資料庫交易、儲存商業物件和事件,以及認可交易或執行復原。 此外,處理事件的背景工作服務很容易實作:它會定期查詢 Outbox 數據表是否有新專案、將新插入的事件發佈至訊息總線,最後將這些專案標示為已處理。

實際上,事情並不像他們第一次看那麼容易。 最重要的是,您必須確定會保留事件的順序,讓 OrderUpdated 事件不會在事件之前 OrderCreated 發佈。

Azure Cosmos DB 中的實作

本節說明如何在 Azure Cosmos DB 中實作交易式寄件箱模式,以在 Azure Cosmos DB 變更摘要和 服務匯流排 的説明下,達成不同服務之間的可靠順序傳訊。 它示範管理Contact物件 (FirstName、 、 LastNameEmail資訊 Company 等等) 的範例服務。 它會使用命令和查詢責任隔離 (CQRS) 模式,並遵循基本的網域驅動設計概念。 您可以在 GitHub找到實作的範例程式代碼。

Contact範例服務中的物件具有下列結構:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

一旦 Contact 建立或更新,就會發出包含目前變更相關信息的事件。 除此之外,網域事件可以是:

  • ContactCreated. 新增聯繫人時引發。
  • ContactNameUpdated. 在或 LastName 變更時FirstName引發。
  • ContactEmailUpdated. 更新電子郵件地址時引發。
  • ContactCompanyUpdated. 變更任何公司屬性時引發。

交易式批次

若要實作此模式,您必須確保 Contact 商務對象和對應的事件會儲存在相同的資料庫交易中。 在 Azure Cosmos DB 中,交易的運作方式與關係資料庫系統中的運作方式不同。 稱為 交易批次的 Azure Cosmos DB 交易會在單一邏輯分割區上運作,因此它們保證不可部分完成性、一致性、隔離和持久性 (ACID) 屬性。 您無法在不同的容器或邏輯分割區的交易批次作業中儲存兩份檔。 針對範例服務,這表示商務物件和事件或事件都會放在相同的容器和邏輯分割區中。

Context、存放庫和 UnitOfWork

範例實作的核心是 容器內容 ,可追蹤儲存在相同交易批次中的物件。 它會維護已建立和修改的物件清單,並在單一 Azure Cosmos DB 容器上運作。 其介面看起來像這樣:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

容器內容元件中的清單會追蹤 ContactDomainEvent 物件。 這兩者都會放在相同的容器中。 這表示多個類型的物件會儲存在相同的 Azure Cosmos DB 容器中,並使用 Type 屬性來區分商業物件與事件。

針對每個類型,都有一個專用的存放庫,可定義及實作數據存取。 存放 Contact 庫介面提供下列方法:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

存放 Event 庫看起來很類似,但只有一個方法會在存放區中建立新的事件:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

這兩個存放庫介面的實作會透過相依性插入取得單 IContainerContext 一實例的參考,以確保兩者在相同的 Azure Cosmos DB 內容上運作。

最後一個元件是 UnitOfWork,它會將 實例中保留的 IContainerContext 變更認可至 Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

事件處理:建立和發行

每當 Contact 物件建立、修改或(虛刪除)時,服務就會引發對應的事件。 提供的解決方案核心是領域驅動設計(DDD)和吉米·柏格德提議的調解器模式的組合。 他建議在將實際物件儲存至資料庫之前,先維護因為修改網域對象而發生的事件清單,併發佈這些事件。

變更清單會保留在定義域物件本身中,讓其他元件無法修改事件的鏈結。 在網域對象中維護事件(IEvent 實例)的行為是透過介面 IEventEmitter<IEvent> 定義,並在抽象 DomainEntity 類中實作:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

對象 Contact 會引發定義域事件。 實體 Contact 遵循基本的 DDD 概念,將網域屬性的 setter 設定為私用。 類別中沒有公用 setter。 相反地,它會提供操作內部狀態的方法。 在這些方法中,可以引發特定修改的適當事件(例如 ContactNameUpdatedContactEmailUpdated)。

以下是聯繫人名稱更新的範例。 (事件會在 方法的結尾引發。

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

追蹤變更的對應 ContactNameUpdatedEvent,如下所示:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

到目前為止,事件只會記錄在網域物件中,而且不會儲存至資料庫,甚至發佈至訊息代理程式。 在建議之後,系統會在商務物件儲存至數據存放區之前,立即處理事件清單。 在此情況下,它會發生在 SaveChangesAsync 實例的方法中 IContainerContext ,這個方法會在私 RaiseDomainEvents 用方法中實作。 (dObjs 是容器內容的追蹤實體清單。

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

在最後一行, MediatR 套件是 C# 中調解器模式的實作,可用來在應用程式內發佈事件。 這樣做是可能的,因為所有事件,例如 ContactNameUpdatedEventINotification 作 MediatR 套件的介面。

這些事件必須由對應的處理程序處理。 在這裡,實作 IEventsRepository 會發揮作用。 以下是事件處理程式的 NameUpdated 範例:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

IEventRepository實例會透過建構函式插入處理程序類別。 一旦 ContactNameUpdatedEvent 在服務中發行 ,就會 Handle 叫用 方法,並使用事件存放庫實例來建立通知物件。 該通知物件會接著插入 物件中的追蹤物件 IContainerContext 清單中,並將儲存在相同交易批次中的物件聯結至 Azure Cosmos DB。

到目前為止,容器內容會知道要處理的物件。 為了最終將追蹤的物件保存到 Azure Cosmos DB,實 IContainerContext 作會建立交易式批次、新增所有相關物件,以及針對資料庫執行作業。 所描述的程式會在 方法中 SaveInTransactionalBatchAsync 處理,此方法會叫用 SaveChangesAsync 這個程式。

以下是您需要建立和執行交易式批次之實作的重要部分:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects); 

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

以下是程式到目前為止運作方式的概觀(用於更新聯繫人物件的名稱):

  1. 用戶端想要更新聯繫人的名稱。 方法 SetName 會在聯繫人物件上叫用,並更新屬性。
  2. 事件 ContactNameUpdated 會新增至網域物件中的事件清單。
  3. 會叫用聯繫人存放庫的 Update 方法,將網域物件新增至容器內容。 對象現在會追蹤。
  4. CommitAsync 會在 實例上 UnitOfWork 叫用,而實例接著會呼叫 SaveChangesAsync 容器內容。
  5. 在內 SaveChangesAsync,定義域物件清單中的所有事件都會由 MediatR 實例發佈,並透過事件存放庫新增至 相同的容器內容
  6. 在 中 SaveChangesAsyncTransactionalBatch 會建立 。 它會同時保存聯繫人物件和事件。
  7. 執行 TransactionalBatch 和數據會認可至 Azure Cosmos DB。
  8. SaveChangesAsyncCommitAsync 成功傳回。

持續性

如上述代碼段所示,儲存至 Azure Cosmos DB 的所有物件都會包裝在 實例中 DataObject 。 此物件提供一般屬性:

這些屬性定義於呼叫 IDataObject 的泛型介面中,並由存放庫和容器內容使用:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

包裝在 實例中 DataObject 並儲存至資料庫的物件,然後看起來會像這個範例 (ContactContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

您可以看到 ContactContactNameUpdatedEvent (類型 domainEvent) 檔具有相同的數據分割索引鍵,而且這兩個檔都會保存在相同的邏輯分割區中。

變更摘要處理

若要讀取事件的數據流,並將其傳送至訊息代理程式,服務會使用 Azure Cosmos DB 變更摘要

變更摘要是容器中變更的持續性記錄。 它會在背景中運作,並追蹤修改。 在一個邏輯分割區內,保證變更的順序。 讀取變更摘要的最方便方式是搭配 Azure Cosmos DB 觸發程式使用 Azure 函式。 另一個選項是使用 變更摘要處理器連結庫。 它可讓您將變更摘要處理整合到 Web API 中作為背景服務(透過 IHostedService 介面)。 這裡的範例會使用簡單的控制台應用程式,實作抽象類 BackgroundService ,在 .NET Core 應用程式中裝載長時間執行的背景工作。

若要從 Azure Cosmos DB 變更摘要接收變更,您需要具現化 ChangeFeedProcessor 物件、註冊訊息處理的處理程式方法,並開始接聽變更:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

處理程式方法(HandleChangesAsync 這裡)接著會處理訊息。 在此範例中,事件會發佈至 服務匯流排 主題,該主題已針對延展性進行分割,並已啟用重複數據刪除功能。 任何對物件變更Contact感興趣的服務,都可以訂閱該 服務匯流排 主題,並接收並處理其本身內容的變更。

產生的 服務匯流排 訊息具有 SessionId 屬性。 當您在 服務匯流排 中使用工作階段時,您保證會保留訊息的順序(FIFO)。 保留此使用案例所需的順序。

以下是處理變更摘要訊息的代碼段:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

錯誤處理

如果在處理變更時發生錯誤,變更摘要庫將會在成功處理最後一個批次的位置重新啟動讀取訊息。 例如,如果應用程式已成功處理 10,000 則訊息,現在正處理批次 10,001 到 10,025,而且發生錯誤,它可以重新啟動並挑選其工作位置為 10,001。 連結庫會自動追蹤透過儲存在 Azure Cosmos DB 容器中的 Leases 資訊所處理的內容。

服務可能已經傳送一些已重新處理至 服務匯流排 的訊息。 一般而言,該案例會導致重複的訊息處理。 如先前所述,服務匯流排 具有重複訊息偵測的功能,您必須針對此案例啟用此功能。 服務會根據訊息的應用程式控制MessageId屬性,檢查訊息是否已新增至 服務匯流排 主題(或佇列)。 這個屬性會設定為 ID 事件檔案的 。 如果相同的訊息再次傳送至 服務匯流排,服務將會忽略並卸除它。

內務處理

在一般交易式寄件箱實作中,服務會更新已處理的事件,並將屬性設定 Processedtrue,指出已成功發行訊息。 這個行為可以在處理程式方法中手動實作。 在目前的案例中,不需要這類程式。 Azure Cosmos DB 會追蹤使用變更摘要處理的事件(結合 Leases 容器)。

最後一個步驟是,您偶爾需要從容器中刪除事件,以便只保留最新的記錄/檔。 若要定期進行清除,實作會套用 Azure Cosmos DB 的另一項功能:檔上的存留時間(TTL)。 Azure Cosmos DB 可以根據可新增至文件的屬性自動刪除檔 TTL :以秒為單位的時間範圍。 服務會持續檢查容器中是否有具有 TTL 屬性的檔。 檔到期后,Azure Cosmos DB 就會將它從資料庫移除。

當所有元件如預期般運作時,事件會快速處理併發佈:在幾秒鐘內。 如果 Azure Cosmos DB 發生錯誤,事件將不會傳送至訊息總線,因為商務對象和對應的事件都無法儲存至資料庫。 唯一要考慮的是,當背景背景工作角色(變更摘要處理器)或服務總線無法使用時,在檔上DomainEvent設定適當的TTL值。 在生產環境中,最好挑選多天的時間範圍。 例如,10 天。 然後,所涉及的所有元件將有足夠的時間來處理/發佈應用程式內的變更。

摘要

交易式寄件匣模式可解決在分散式系統中可靠地發佈網域事件的問題。 藉由認可商業物件的狀態及其事件在同一交易批次中,並使用背景處理器做為訊息轉送,您可確保其他服務內部或外部最終會收到其相依的資訊。 此範例不是交易式寄件箱模式的傳統實作。 它會使用 Azure Cosmos DB 變更摘要和存留時間等功能,讓事情保持簡單且乾淨。

以下是此案例中使用的 Azure 元件摘要:

Diagram that shows the Azure components to implement Transactional Outbox with Azure Cosmos DB and Azure Service Bus.

下載此架構的 Visio 檔案

此解決方案的優點如下:

  • 可靠的傳訊和保證事件傳遞。
  • 透過 服務匯流排 保留事件和訊息重複的順序。
  • 不需要維護額外的 Processed 屬性,指出成功處理事件檔。
  • 透過TTL從 Azure Cosmos DB 刪除事件。 此程式不會取用處理使用者/應用程式要求所需的要求單位。 相反地,它會在背景工作中使用「剩餘」要求單位。
  • 透過 ChangeFeedProcessor (或 Azure 函式) 處理訊息時發生錯誤。
  • 選擇性:多個變更摘要處理器,每個處理器都會在變更摘要中維護自己的指標。

考量

本文中討論的範例應用程式示範如何使用 Azure Cosmos DB 和 服務匯流排 在 Azure 上實作交易式 Outbox 模式。 另外還有其他使用 NoSQL 資料庫的方法。 若要保證商務物件和事件會可靠地儲存在資料庫中,您可以在商務物件檔中內嵌事件清單。 此方法的缺點是清除程式需要更新包含事件的每個檔。 與使用TTL相比,這並不理想,特別是在要求單位成本方面。

請記住,您不應該考慮這裡提供的範例程序代碼。 它對於多線程處理有一些限制,特別是類別中 DomainEntity 處理事件的方式,以及如何在實作中 CosmosContainerContext 追蹤物件。 使用它做為您自己的實作起點。 或者,請考慮使用已內建此功能的現有連結庫,例如 NServiceBusMassTransit

部署此案例

您可以在 GitHub 上找到原始程式碼、部署檔案,以及測試此案例的指示: https://github.com/mspnp/transactional-outbox-pattern

參與者

本文由 Microsoft 維護。 原始投稿人如下。

主體作者:

若要查看非公用LinkedIn配置檔,請登入LinkedIn。

下一步

請檢閱下列文章以深入瞭解: