التشغيل السريع: إرسال الأحداث إلى أو تلقيها من مراكز الأحداث باستخدام تطبيق Go

Azure Event Hubs هي منصة تدفق البيانات الكبيرة وخدمة استيعاب الأحداث، القادرة على تلقي ملايين الأحداث ومعالجتها في الثانية. يمكن لـ Event Hubs معالجة الأحداث أو البيانات أو بيانات تتبع الاستخدام التي تنتجها البرامج والأجهزة الموزعة وتخزينها. يمكن تحويل البيانات المرسلة إلى مركز الحدث وتخزينها باستخدام أي موفر تحليلات يعمل في الوقت الفعلي أو محولات الدفعات / التخزين. للاطلاع على نظرة عامة مفصلة حول Event Hubs، انظر نظرة عامة على ميزاتEvent Hubs.

يصف هذا التشغيل السريع كيفية كتابة تطبيقات Go لإرسال الأحداث إلى مركز الأحداث أو تلقيها منه.

إشعار

يستند هذا التشغيل السريع إلى عينات على GitHub في https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. يعتمد الإرسال على نموذج example_producing_events_test.go ويستند الإرسال إلى نموذج example_processor_test.go . يتم تبسيط التعليمات البرمجية لبدء التشغيل السريع وتتم إزالة جميع التعليقات التفصيلية، لذلك انظر إلى العينات لمزيد من التفاصيل والتفسيرات.

المتطلبات الأساسية

تحتاج إلى المتطلبات الأساسية التالية لإكمال هذا التشغيل السريع:

  • تثبيت تطبيق Go محلياً. اتبع هذه الإرشادات إذا لزم الأمر.
  • حساب Azureنشط. في حال لم يكن لديك اشتراك Azure، فأنشئ حساباً مجانيّاً قبل البدء.
  • أنشئ مساحة اسم لـ Event Hubs ومركز الأحداث. استخدم مدخل Microsoft Azure لإنشاء مساحة اسم نوع لمراكز الأحداث والحصول على إدارة بيانات الاعتماد لتطبيقك الذي يحتاج إلى الاتصال بمركز الأحداث. لإنشاء مساحة اسم ومركز أحداث، اتبع الإجراء الوارد في هذه المقالة.

إرسال الأحداث

يوضح هذا القسم كيفية إنشاء تطبيق Go لإرسال الأحداث لمركز أحداث.

تثبيت حزمة التطبيق Go

احصل على حزمة Go لمراكز الأحداث كما هو موضح في المثال التالي.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

التعليمات البرمجية لإرسال الأحداث إلى مركز أحداث

إليك التعليمات البرمجية لإرسال الأحداث إلى مركز أحداث. الخطوات الرئيسية في التعليمات البرمجية هي:

  1. إنشاء عميل منتج لمراكز الأحداث باستخدام سلسلة الاتصال إلى مساحة اسم مراكز الأحداث واسم مركز الأحداث.
  2. إنشاء كائن دفعة وإضافة أحداث عينة إلى الدفعة.
  3. إرسال دفعة الأحداث إلى الأحداث.

هام

استبدل NAMESPACE CONNECTION STRING سلسلة الاتصال بمساحة اسم مراكز الأحداث الخاصة بك و EVENT HUB NAME باسم مركز الأحداث في نموذج التعليمات البرمجية.

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

لا تقم بتشغيل التطبيق حتى الآن. تحتاج أولا إلى تشغيل تطبيق المتلقي ثم تطبيق المرسل.

استقبال الأحداث

إنشاء حساب تخزين وحاوية

تتم مشاركة حالة مثل التأجير على أقسام ونقاط التحقق في دفق الحدث بين أجهزة الاستقبال باستخدام حاوية تخزين Azure. يمكنك إنشاء حساب تخزين وحاوية باستخدام Go SDK، ولكن يمكنك أيضًا إنشاء حساب باتباع الإرشادات الموجودة في نبذة حول حسابات تخزين Azure.

اتبع هذه التوصيات عند استخدام Azure Blob Storage كمخزن نقطة تحقق:

  • استخدم حاوية منفصلة لكل مجموعة مستهلكين. يمكنك استخدام نفس حساب التخزين، ولكن استخدام حاوية واحدة لكل مجموعة.
  • لا تستخدم الحاوية لأي شيء آخر، ولا تستخدم حساب التخزين لأي شيء آخر.
  • يجب أن يكون حساب التخزين في نفس المنطقة التي يوجد فيها التطبيق المنشور. إذا كان التطبيق محليا، فحاول اختيار أقرب منطقة ممكنة.

في صفحة Storage account في مدخل Microsoft Azure، في قسم Blob service ، تأكد من تعطيل الإعدادات التالية.

  • مساحة الاسم الهرمية
  • حذف مبدئي لكائن ثنائي كبير الحجم
  • تعيين الإصدار

حزم Go

لتلقي الرسائل، احصل على حزم Go لمراكز الأحداث كما هو موضح في المثال التالي.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

التعليمات البرمجية لتلقي الأحداث من مركز أحداث

إليك التعليمات البرمجية لتلقي الأحداث من مركز أحداث. الخطوات الرئيسية في التعليمات البرمجية هي:

  1. تحقق من كائن مخزن نقاط التحقق الذي يمثل Azure Blob Storage المستخدم من قبل مركز الأحداث لنقاط التفتيش.
  2. إنشاء عميل مستهلك لمراكز الأحداث باستخدام سلسلة الاتصال إلى مساحة اسم مراكز الأحداث واسم مركز الأحداث.
  3. إنشاء معالج حدث باستخدام كائن العميل وعنصر مخزن نقطة التحقق. يتلقى المعالج الأحداث ويعالجها.
  4. لكل قسم في مركز الحدث، قم بإنشاء عميل قسم مع processEvents كدالة لمعالجة الأحداث.
  5. تشغيل جميع عملاء القسم لتلقي الأحداث ومعالجتها.

هام

استبدل قيم العنصر النائب التالية بالقيم الفعلية:

  • AZURE STORAGE CONNECTION STRINGمع سلسلة الاتصال لحساب تخزين Azure الخاص بك
  • BLOB CONTAINER NAME باسم حاوية الكائن الثنائي كبير الحجم التي قمت بإنشائها في حساب التخزين
  • NAMESPACE CONNECTION STRINGمع سلسلة الاتصال لمساحة اسم مراكز الأحداث
  • EVENT HUB NAME مع اسم مركز الحدث في نموذج التعليمات البرمجية.
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
    
    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

تشغيل تطبيقات المستلم والمرسل

  1. قم بتشغيل تطبيق المتلقي أولا.

  2. تشغيل تطبيق المرسل.

  3. انتظر دقيقة لمشاهدة الإخراج التالي في نافذة المتلقي.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

الخطوات التالية

راجع العينات على GitHub في https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.