نمط علبة الصادر للمعاملات باستخدام Azure Cosmos DB

Azure Cosmos DB
Azure Service Bus
Azure Functions

يمكن أن يكون تنفيذ المراسلة الموثوقة في الأنظمة الموزعة أمرًا صعبًا. توضح هذه المقالة كيفية استخدام نمط علبة الصادر للمعاملات للمراسلة الموثوق بها والتسليم المضمون للأحداث، وهو جزء مهم من دعم معالجة الرسائل غير المتكررة. لتحقيق ذلك، ستستخدم دفعات معاملات Azure Cosmos DB وتغيير الموجز مع ناقل خدمة Azure.

نظرة عامة

أصبحت بنيات الخدمات المصغرة تحظى بشعبية متزايدة وتظهر وعدًا في حل مشكلات مثل قابلية التوسع وقابلية الصيانة وخفة الحركة، خاصة في التطبيقات الكبيرة. ولكن هذا النمط المعماري يقدم أيضًا تحديات عندما يتعلق الأمر بمعالجة البيانات. في التطبيقات الموزعة، تحتفظ كل خدمة بشكل مستقل بالبيانات التي تحتاج إليها للعمل في مخزن بيانات مخصص مملوك للخدمة. لدعم مثل هذا السيناريو، يمكنك عادة استخدام حل مراسلة مثل RabbitMQ أو Kafka أو ناقل خدمة Azure الذي يوزع البيانات (الأحداث) من خدمة واحدة عبر ناقل مراسلة إلى خدمات أخرى من التطبيق. يمكن للمستهلكين الداخليين أو الخارجيين بعد ذلك الاشتراك في هذه الرسائل والحصول على إخطار بالتغييرات بمجرد معالجة البيانات.

مثال معروف في هذا المجال هو نظام الطلب: عندما يريد المستخدم إنشاء طلب، تتلقى الخدمة Ordering البيانات من تطبيق عميل عبر نقطة نهاية REST. يقوم بتعيين الحمولة إلى تمثيل داخلي لكائن Order للتحقق من صحة البيانات. بعد تثبيت ناجح لقاعدة البيانات، فإنه ينشر حدث OrderCreated إلى ناقل رسالة. أي خدمة أخرى مهتمة بطلبات جديدة (على سبيل المثال خدمة Inventory أو Invoicing)، ستشترك في الرسائل OrderCreated، وتعالجها، وتخزنها في قاعدة البيانات الخاصة بها.

يوضح الرمز الزائف التالي كيف تبدو هذه العملية عادة من منظور الخدمة Ordering:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

يعمل هذا الأسلوب بشكل جيد حتى يحدث خطأ بين حفظ كائن الطلب ونشر الحدث المقابل. قد يفشل إرسال حدث في هذه المرحلة لأسباب عديدة:

  • أخطاء الشبكة
  • انقطاع خدمة الرسائل
  • فشل المضيف

مهما كان الخطأ، فإن النتيجة هي أنه لا يمكن نشر الحدث OrderCreated إلى ناقل الرسالة. لن يتم إعلام الخدمات الأخرى بأنه تم إنشاء طلب. يجب أن تهتم الخدمة Ordering الآن بأشياء مختلفة لا تتعلق بعملية العمل الفعلية. يجب أن تتبع الأحداث التي لا تزال بحاجة إلى وضعها على ناقل الرسائل بمجرد عودتها عبر الإنترنت. حتى أسوأ حالة يمكن أن تحدث: عدم تناسق البيانات في التطبيق بسبب الأحداث المفقودة.

رسم تخطيطي يوضح معالجة الأحداث دون نمط علبة الصادر للمعاملات.

حل

هناك نمط معروف يسمى علبة الصادر للمعاملات يمكن أن يساعدك على تجنب هذه المواقف. يضمن حفظ الأحداث في مخزن البيانات (عادة في جدول علبة الصادر في قاعدة البيانات الخاصة بك) قبل دفعها في نهاية المطاف إلى وسيط الرسائل. إذا تم حفظ كائن العمل والأحداث المقابلة داخل نفس معاملة قاعدة البيانات، فمن المضمون عدم فقدان أي بيانات. سيتم تثبيت كل شيء، أو سيتراجع كل شيء إذا كان هناك خطأ. لنشر الحدث في النهاية، تقوم خدمة أو عملية عامل مختلفة بالاستعلامات عن جدول علبة الصادر للإدخالات غير المعالجة، ونشر الأحداث، ووضع علامة عليها كمعالجة. يضمن هذا النمط عدم فقدان الأحداث بعد إنشاء كائن عمل أو تعديله.

رسم تخطيطي يوضح معالجة الأحداث مع نمط

قم بتنزيل ملف Visio لهذه البنية.

في قاعدة بيانات علائقية، يكون تنفيذ النمط واضحًا. إذا كانت الخدمة تستخدم Entity Framework Core، على سبيل المثال، فستستخدم سياق Entity Framework لإنشاء معاملة قاعدة بيانات، وحفظ كائن العمل والحدث، وتنفيذ المعاملة - أو إجراء التراجع. أيضًا، من السهل تنفيذ خدمة العامل التي تعالج الأحداث: فهي تقوم بشكل دوري بالاستعلامات عن جدول علبة الصادر للإدخالات الجديدة، وتنشر الأحداث المدرجة حديثا إلى ناقل الرسائل، وأخيرًا تقوم بوضع علامة على هذه الإدخالات كمعالجة.

في الممارسة العملية، الأمور ليست بالسهولة التي قد تنظر إليها أولًا. والأهم من ذلك، تحتاج إلى التأكد من الاحتفاظ بترتيب الأحداث بحيث لا يتم نشر الحدث OrderUpdated قبل الحدث OrderCreated.

التنفيذ في Azure Cosmos DB

يوضح هذا القسم كيفية تنفيذ نمط علبة الصادر للمعاملات في Azure Cosmos DB لتحقيق مراسلة موثوقة بترتيب بين الخدمات المختلفة بمساعدة موجز تغيير Azure Cosmos DB ونقل الخدمة. يوضح نموذج الخدمة التي تدير الكائنات Contact (المعلومات FirstName، وLastName، وEmail، وCompany، وما إلى ذلك). ويستخدم نمط الفصل بين مسؤولية الأوامر والاستعلام (CQRS) ويتبع مفاهيم التصميم الأساسية المستندة إلى المجال (DDD). يمكنك العثور على نموذج التعليمات البرمجية للتنفيذ على GitHub.

يحتوي كائن Contact في خدمة العينة على البنية التالية:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

بمجرد إنشاء أو تحديث Contact، فإنه يصدر الأحداث التي تحتوي على معلومات حول التغيير الحالي. من بين أمور أخرى، يمكن أن تكون أحداث المجال:

  • ContactCreated. يتم رفعه عند إضافة جهة اتصال.
  • ContactNameUpdated. يرفع عند تغيير FirstName أو LastName.
  • ContactEmailUpdated. يتم رفعه عند تحديث عنوان البريد الإلكتروني.
  • ContactCompanyUpdated. يتم رفعه عند تغيير أي من خصائص الشركة.

دفعات المعاملات

لتنفيذ هذا النمط، تحتاج إلى التأكد من حفظ كائن العمل Contact والأحداث المقابلة في نفس معاملة قاعدة البيانات. في Azure Cosmos DB، تعمل المعاملات بشكل مختلف عما تعمل عليه في أنظمة قواعد البيانات الارتباطية. تعمل معاملات Azure Cosmos DB، التي تسمى دفعات المعاملات، على قسم منطقي واحد، بحيث تضمن خصائص الذرية والاتساق والعزل والمتانة (ACID). لا يمكنك حفظ مستندين في عملية دفعة معاملات في حاويات أو أقسام منطقية مختلفة. بالنسبة إلى نموذج الخدمة، هذا يعني أنه سيتم وضع كل من كائن العمل والحدث أو الأحداث في نفس الحاوية والقسم المنطقي.

السياق والمستودعات وUniitOfWork

جوهر تطبيق العينة هو سياق حاوية يتعقب الكائنات التي يتم حفظها في نفس دفعة المعاملات. يحتفظ بقائمة الكائنات التي تم إنشاؤها وتعديلها ويعمل على حاوية Azure Cosmos DB واحدة. تبدو الواجهة الخاصة به كما يلي:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

تتعقب القائمة الموجودة في مكون سياق الحاوية الكائنات Contact وDomainEvent. سيتم وضع كليهما في نفس الحاوية. وهذا يعني أنه يتم تخزين أنواع متعددة من الكائنات في نفس حاوية Azure Cosmos DB واستخدام خاصية Type للتمييز بين كائن عمل وحدث.

لكل نوع، هناك مستودع مخصص يحدد الوصول إلى البيانات وينفذه. توفر واجهة المستودع Contact هذه الطرق:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

يبدو المستودع Event مشابها، باستثناء أن هناك طريقة واحدة فقط، والتي تنشئ أحداثًا جديدة في المتجر:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

تحصل تطبيقات واجهات المستودع على مرجع عبر إدخال التبعية إلى مثيل IContainerContext واحد للتأكد من أن كلًا منهما يعمل على نفس سياق Azure Cosmos DB.

المكون الأخير هو UnitOfWork، الذي يقوم بتثبيت التغييرات الموجودة في المثيل IContainerContext إلى Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

معالجة الأحداث: الإنشاء والنشر

في كل مرة يتم فيها إنشاء كائن Contact أو تعديله أو حذفه (مبدئيًا)، تقوم الخدمة برفع حدث مطابق. جوهر الحل المقدم هو مزيج من التصميم المستند إلى المجال (DDD) ونمط الوسيط الذي اقترحه جيمي بوغارد. يقترح الاحتفاظ بقائمة الأحداث التي حدثت بسبب تعديلات عنصر المجال ونشر هذه الأحداث قبل حفظ الكائن الفعلي في قاعدة البيانات.

يتم الاحتفاظ بقائمة التغييرات في كائن المجال نفسه بحيث لا يمكن لأي مكون آخر تعديل سلسلة الأحداث. يتم تعريف سلوك الحفاظ على الأحداث (المثيلات IEvent) في كائن المجال عبر واجهة IEventEmitter<IEvent> ويتم تنفيذه في فئة مجردة DomainEntity:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

يقوم الكائن Contact برفع أحداث المجال. يتبع الكيان Contact مفاهيم DDD الأساسية، وتكوين محددات خصائص المجال كخاصة. لا توجد عوامل تعيين عامة في الفئة. بدلًا من ذلك، فإنه يوفر أساليب لمعالجة الحالة الداخلية. في هذه الأساليب، يمكن رفع الأحداث المناسبة لتعديل معين (على سبيل المثال ContactNameUpdated أو ContactEmailUpdated).

فيما يلي مثال على التحديثات التي تم إدخالها على اسم جهة اتصال. (يتم رفع الحدث في نهاية الأسلوب.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

يبدو المطابق ContactNameUpdatedEvent، الذي يتعقب التغييرات، كما يلي:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

حتى الآن، يتم تسجيل الأحداث فقط في كائن المجال ولا يتم حفظ أي شيء في قاعدة البيانات أو حتى نشرها إلى وسيط الرسائل. بعد التوصية، ستتم معالجة قائمة الأحداث مباشرة قبل حفظ كائن العمل في مخزن البيانات. في هذه الحالة، يحدث في أسلوب SaveChangesAsync للمثيل IContainerContext، الذي يتم تنفيذه في أسلوب خاص RaiseDomainEvents. (dObjs هي قائمة الكيانات المتعقبة لسياق الحاوية.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

في السطر الأخير، يتم استخدام حزمة MediatR، وهي تنفيذ لنمط الوسيط في C#، لنشر حدث داخل التطبيق. القيام بذلك ممكن لأن جميع الأحداث مثل ContactNameUpdatedEvent ينفذ واجهة INotification لحزمة MediatR.

يجب معالجة هذه الأحداث بواسطة معالج مطابق. وهنا، يدخل التنفيذ IEventsRepository حيز التنفيذ. إليك عينة معالج الأحداث NameUpdated:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

يتم إدخال مثيل IEventRepository في فئة المعالج عبر الدالة الإنشائية. بمجرد نشر في الخدمة ContactNameUpdatedEvent، يتم استدعاء الأسلوب Handle ويستخدم مثيل مستودع الأحداث لإنشاء كائن إعلام. يتم إدراج كائن الإعلام هذا بدوره في قائمة الكائنات المتعقبة في العنصر IContainerContext ويربط الكائنات المحفوظة في دفعة المعاملات نفسها إلى Azure Cosmos DB.

حتى الآن، يعرف سياق الحاوية الكائنات التي يجب معالجتها. للاستمرار في النهاية في الكائنات المتعقبة إلى Azure Cosmos DB، ينشئ التنفيذ IContainerContext دفعة المعاملات، ويضيف جميع العناصر ذات الصلة، ويشغل العملية مقابل قاعدة البيانات. تتم معالجة العملية الموضحة في الأسلوب SaveInTransactionalBatchAsync الذي يتم استدعاؤه بواسطة الأسلوب SaveChangesAsync.

فيما يلي الأجزاء المهمة من التنفيذ التي تحتاج إليها لإنشاء وتشغيل دفعة المعاملات:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects);

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

فيما يلي نظرة عامة حول كيفية عمل العملية حتى الآن (لتحديث الاسم على عنصر جهة اتصال):

  1. يريد العميل تحديث اسم جهة اتصال. يتم استدعاء الأسلوب SetName على كائن جهة الاتصال ويتم تحديث الخصائص.
  2. تتم إضافة الحدث ContactNameUpdated إلى قائمة الأحداث في كائن المجال.
  3. يتم استدعاء أسلوب مستودع جهة الاتصال Update، والذي يضيف كائن المجال إلى سياق الحاوية. يتم الآن تعقب الكائن.
  4. يتم استدعاء CommitAsync على المثيل UnitOfWork، والذي بدوره يستدعي SaveChangesAsync على سياق الحاوية.
  5. ضمن SaveChangesAsync، يتم نشر كافة الأحداث في قائمة كائن المجال بواسطة مثيل MediatR وتتم إضافتها عبر مستودع الحدث إلى نفس سياق الحاوية.
  6. في SaveChangesAsync، يتم إنشاء TransactionalBatch. وسيحتفظ بكل من كائن جهة الاتصال والحدث.
  7. يقوم TransactionalBatch بالتشغيل وتلتزم البيانات بـAzure Cosmos DB.
  8. يرجع SaveChangesAsync وCommitAsync بنجاح.

استمرار

كما ترى في قصاصات التعليمات البرمجية السابقة، يتم تضمين جميع الكائنات المحفوظة في Azure Cosmos DB في مثيل DataObject. يوفر هذا الكائن خصائص شائعة:

  • ID.
  • PartitionKey.
  • Type.
  • State. مثل Created، لن تستمر Updated في Azure Cosmos DB.
  • Etag. للقفل المتفائل.
  • TTL. خاصية Time To Live للتنظيف التلقائي للمستندات القديمة.
  • Data. كائن بيانات عام.

يتم تعريف هذه الخصائص في واجهة عامة تسمى IDataObject وتستخدمها المستودعات وسياق الحاوية:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

ستبدو الكائنات المغلفة في مثيل DataObject وحفظها في قاعدة البيانات مثل هذه العينة (Contact وContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

يمكنك أن ترى أن المستندات Contact وContactNameUpdatedEvent (النوع domainEvent) لها نفس مفتاح القسم وأن كلا المستندين سيستمران في نفس القسم المنطقي.

تغيير معالجة الموجز

لقراءة دفق الأحداث وإرسالها إلى وسيط الرسائل، ستستخدم الخدمة موجز تغيير Azure Cosmos DB.

موجز التغيير هو سجل مستمر للتغييرات في الحاوية الخاصة بك. يعمل في الخلفية ويتعقب التعديلات. ضمن قسم منطقي واحد، يتم ضمان ترتيب التغييرات. الطريقة الأكثر ملاءمة لقراءة موجز التغيير هي استخدام دالة Azure مع مشغل Azure Cosmos DB. خيار آخر هو استخدام مكتبة معالج موجز التغيير. يتيح لك دمج معالجة موجز التغيير في واجهة برمجة تطبيقات الويب كخدمة خلفية (عبر الواجهة IHostedService). يستخدم النموذج هنا تطبيق وحدة تحكم بسيط ينفذ الفئة المجردة BackgroundService لاستضافة مهام الخلفية طويلة الأمد في تطبيقات .NET Core.

لتلقي التغييرات من موجز تغيير Azure Cosmos DB، تحتاج إلى إنشاء مثيل لكائن ChangeFeedProcessor، وتسجيل أسلوب معالج لمعالجة الرسائل، وبدء الاستماع للتغييرات:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

ثم يعالج أسلوب المعالج (HandleChangesAsync هنا) الرسائل. في هذا النموذج، يتم نشر الأحداث إلى موضوع ناقل خدمة Microsoft Azure الذي تم تقسيمه لقابلية التوسع وتم تمكين ميزة إلغاء التكرار. يمكن لأي خدمة مهتمة بالتغييرات على الكائنات Contact بعد ذلك الاشتراك في موضوع ناقل خدمة Microsoft Azure هذا وتلقي التغييرات ومعالجتها للسياق الخاص بها.

تحتوي رسائل ناقل خدمة Microsoft Azure التي تم إنتاجها على خاصية SessionId. عند استخدام جلسات العمل في ناقل خدمة Microsoft Azure، فإنك تضمن الاحتفاظ بترتيب الرسائل (أولا في، الخروج أولا (FIFO)). الاحتفاظ بالترتيب ضروري لحالة الاستخدام هذه.

إليك القصاصة البرمجية التي تعالج الرسائل من موجز التغيير:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

معالجة الخطأ

إذا كان هناك خطأ أثناء معالجة التغييرات، فستقوم مكتبة موجز التغيير بإعادة تشغيل قراءة الرسائل في الموضع حيث تمت معالجة الدفعة الأخيرة بنجاح. على سبيل المثال، إذا قام التطبيق بمعالجة 10,000 رسالة بنجاح، يعمل الآن على الدفعة 10,001 إلى 10,025، وحدث خطأ، يمكنه إعادة التشغيل واستلام عمله في الموضع 10,001. تتعقب المكتبة تلقائيًا ما تمت معالجته عبر المعلومات المحفوظة في حاوية Leases في Azure Cosmos DB.

من المحتمل أن تكون الخدمة قد أرسلت بالفعل بعض الرسائل التي تتم إعادة معالجتها إلى ناقل خدمة Microsoft Azure. عادة ما يؤدي هذا السيناريو إلى تكرار معالجة الرسائل. كما ذكر سابقًا، يحتوي ناقل خدمة Microsoft Azure على ميزة للكشف عن الرسائل المكررة التي تحتاج إلى تمكينها لهذا السيناريو. تتحقق الخدمة مما إذا كانت الرسالة قد تمت إضافتها بالفعل إلى موضوع ناقل خدمة Microsoft Azure (أو قائمة انتظار) استنادًا إلى الخاصية MessageId التي يتحكم فيها التطبيق للرسالة. يتم تعيين هذه الخاصية إلى ID لمستند الحدث. إذا تم إرسال نفس الرسالة مرة أخرى إلى ناقل خدمة Microsoft Azure، فسيتم تجاهل الخدمة وإسقاطها.

التدبير المنزلي

في تنفيذ علبة الصادر للمعاملات النموذجية، تقوم الخدمة بتحديث الأحداث التي تمت معالجتها وتعيين خاصية Processed إلى true، مما يشير إلى أنه تم نشر رسالة بنجاح. يمكن تنفيذ هذا السلوك يدويًا في أسلوب المعالج. في السيناريو الحالي، ليست هناك حاجة لمثل هذه العملية. يتعقب Azure Cosmos DB الأحداث التي تمت معالجتها باستخدام موجز التغيير (بالاشتراك مع الحاوية Leases).

كخطوة أخيرة، تحتاج أحيانًا إلى حذف الأحداث من الحاوية بحيث تحتفظ بأحدث السجلات/المستندات فقط. لإجراء عملية تنظيف بشكل دوري، يطبق التنفيذ ميزة أخرى من Azure Cosmos DB: Time To Live (TTL) على المستندات. يمكن لـAzure Cosmos DB حذف المستندات تلقائيًا استنادًا إلى خاصية TTL يمكن إضافتها إلى مستند: فترة زمنية بالثوان. ستتحقق الخدمة باستمرار من الحاوية بحثا عن المستندات التي تحتوي على خاصية TTL. بمجرد انتهاء صلاحية المستند، سيقوم Azure Cosmos DB بإزالته من قاعدة البيانات.

عندما تعمل جميع المكونات كما هو متوقع، تتم معالجة الأحداث ونشرها بسرعة: في غضون ثوانٍ. إذا كان هناك خطأ في Azure Cosmos DB، فلن يتم إرسال الأحداث إلى ناقل الرسالة، لأنه لا يمكن حفظ كل من كائن العمل والأحداث المقابلة في قاعدة البيانات. الشيء الوحيد الذي يجب مراعاته هو تعيين قيمة مناسبة لـTTL على المستندات DomainEvent عندما لا يتوفر عامل الخلفية (معالج موجز التغيير) أو ناقل الخدمة. في بيئة الإنتاج، من الأفضل اختيار فترة زمنية متعددة الأيام. على سبيل المثال، 10 أيام. سيكون لدى جميع المكونات المعنية وقت كاف لمعالجة/نشر التغييرات داخل التطبيق.

الملخص

يحل نمط علبة الصادر للمعاملات مشكلة نشر أحداث المجال بشكل موثوق في الأنظمة الموزعة. من خلال الالتزام بحالة كائن العمل وأحداثه في نفس دفعة المعاملات واستخدام معالج خلفية كمرحل رسالة، فإنك تضمن أن الخدمات الأخرى، الداخلية أو الخارجية، ستتلقى في النهاية المعلومات التي تعتمد عليها. هذه العينة ليست تنفيذًا تقليديًا لنمط علبة الصادر للمعاملات. ويستخدم ميزات مثل موجز تغيير Azure Cosmos DB ووقت البقاء التي تحافظ على الأشياء بسيطة ونظيفة.

فيما يلي ملخص لمكونات Azure المستخدمة في هذا السيناريو:

رسم تخطيطي يوضح مكونات Azure لتنفيذ

قم بتنزيل ملف Visio لهذه البنية.

مزايا هذا الحل هي:

  • المراسلة الموثوقة والتسليم المضمون للأحداث.
  • الحفاظ على ترتيب الأحداث وإلغاء تكرار الرسائل عبر ناقل خدمة Microsoft Azure.
  • لا حاجة للاحتفاظ بخاصية Processed إضافية تشير إلى المعالجة الناجحة لمستند حدث.
  • حذف الأحداث من Azure Cosmos DB عبر الوقت للعيش (TTL). لا تستهلك العملية وحدات الطلب المطلوبة لمعالجة طلبات المستخدم/التطبيق. بدلًا من ذلك، يستخدم وحدات طلب "leftover" في مهمة خلفية.
  • معالجة الرسائل بإثبات الخطأ عبر ChangeFeedProcessor (أو دالة Azure).
  • اختياري: معالجات موجز التغيير المتعددة، كل منها يحافظ على مؤشره الخاص في موجز التغيير.

الاعتبارات

يوضح نموذج التطبيق الذي تمت مناقشته في هذه المقالة كيف يمكنك تنفيذ نمط علبة الصادر للمعاملات على Azure باستخدام Azure Cosmos DB وService Bus. هناك أيضًا أساليب أخرى تستخدم قواعد بيانات NoSQL. لضمان حفظ كائن العمل والأحداث بشكل موثوق في قاعدة البيانات، يمكنك تضمين قائمة الأحداث في مستند عنصر العمل. الجانب السلبي لهذا الأسلوب هو أن عملية التنظيف ستحتاج إلى تحديث كل مستند يحتوي على أحداث. هذا ليس مثاليًا، خاصة من حيث تكلفة وحدة الطلب، مقارنة باستخدام TTL.

ضع في اعتبارك أنه لا يجب مراعاة نموذج التعليمات البرمجية المقدمة هنا التعليمات البرمجية الجاهزة للإنتاج. يحتوي على بعض القيود المتعلقة بالقراءة المتعددة، خاصة الطريقة التي يتم بها التعامل مع الأحداث في الفئة DomainEntity وكيفية تعقب الكائنات في عمليات التنفيذ CosmosContainerContext. استخدمه كنقطة بداية للتطبيقات الخاصة بك. بدلا من ذلك، ضع في اعتبارك استخدام المكتبات الموجودة التي تحتوي بالفعل على هذه الوظيفة مضمنة فيها مثل NServiceBus أو MassTransit.

نشر هذا السيناريو

يمكنك العثور على التعليمات البرمجية المصدر وملفات التوزيع والإرشادات لاختبار هذا السيناريو على GitHub: https://github.com/mspnp/transactional-outbox-pattern.

المساهمون

تحتفظ Microsoft بهذه المقالة. وهي مكتوبة في الأصل من قبل المساهمين التاليين.

الكاتب الرئيسي:

لمشاهدة ملفات تعريف LinkedIn غير العامة، سجل الدخول إلى LinkedIn.

الخطوات التالية

راجع هذه المقالات لمعرفة المزيد: