التشغيل السريع: إرسال الأحداث إلى أو تلقيها من مراكز الأحداث باستخدام تطبيق Go
Azure Event Hubs هي منصة تدفق البيانات الكبيرة وخدمة استيعاب الأحداث، القادرة على تلقي ملايين الأحداث ومعالجتها في الثانية. يمكن لـ Event Hubs معالجة الأحداث أو البيانات أو بيانات تتبع الاستخدام التي تنتجها البرامج والأجهزة الموزعة وتخزينها. يمكن تحويل البيانات المرسلة إلى مركز الحدث وتخزينها باستخدام أي موفر تحليلات يعمل في الوقت الفعلي أو محولات الدفعات / التخزين. للاطلاع على نظرة عامة مفصلة حول Event Hubs، انظر نظرة عامة على ميزاتEvent Hubs.
يصف هذا التشغيل السريع كيفية كتابة تطبيقات Go لإرسال الأحداث إلى مركز الأحداث أو تلقيها منه.
إشعار
يستند هذا التشغيل السريع إلى عينات على GitHub في https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. يستند قسم إرسال الأحداث إلى نموذج example_producing_events_test.go ويستند قسم الاستلام إلى نموذج example_processor_test.go . يتم تبسيط التعليمات البرمجية لبدء التشغيل السريع وتتم إزالة جميع التعليقات التفصيلية، لذلك انظر إلى العينات لمزيد من التفاصيل والتفسيرات.
المتطلبات الأساسية
تحتاج إلى المتطلبات الأساسية التالية لإكمال هذا التشغيل السريع:
- تثبيت تطبيق Go محلياً. اتبع هذه الإرشادات إذا لزم الأمر.
- حساب Azureنشط. في حال لم يكن لديك اشتراك Azure، فأنشئ حساباً مجانيّاً قبل البدء.
- أنشئ مساحة اسم لـ Event Hubs ومركز الأحداث. استخدم مدخل Microsoft Azure لإنشاء مساحة اسم نوع لمراكز الأحداث والحصول على إدارة بيانات الاعتماد لتطبيقك الذي يحتاج إلى الاتصال بمركز الأحداث. لإنشاء مساحة اسم ومركز أحداث، اتبع الإجراء الوارد في هذه المقالة.
إرسال الأحداث
يوضح هذا القسم كيفية إنشاء تطبيق Go لإرسال الأحداث لمركز أحداث.
تثبيت حزمة التطبيق Go
احصل على حزمة Go لمراكز الأحداث كما هو موضح في المثال التالي.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
التعليمات البرمجية لإرسال الأحداث إلى مركز أحداث
إليك التعليمات البرمجية لإرسال الأحداث إلى مركز أحداث. الخطوات الرئيسية في التعليمات البرمجية هي:
- إنشاء عميل منتج لمراكز الأحداث باستخدام سلسلة الاتصال إلى مساحة اسم مراكز الأحداث واسم مركز الأحداث.
- إنشاء كائن دفعة وإضافة أحداث عينة إلى الدفعة.
- إرسال دفعة الأحداث إلى الأحداث.
هام
استبدل NAMESPACE CONNECTION STRING
سلسلة الاتصال بمساحة اسم مراكز الأحداث الخاصة بك و EVENT HUB NAME
باسم مركز الأحداث في نموذج التعليمات البرمجية.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
لا تقم بتشغيل التطبيق حتى الآن. تحتاج أولا إلى تشغيل تطبيق المتلقي ثم تطبيق المرسل.
استقبال الأحداث
إنشاء حساب تخزين وحاوية
تتم مشاركة الحالة مثل عقود الإيجار على الأقسام ونقاط التحقق في الأحداث بين أجهزة الاستقبال باستخدام حاوية تخزين Azure. يمكنك إنشاء حساب تخزين وحاوية باستخدام Go SDK، ولكن يمكنك أيضًا إنشاء حساب باتباع الإرشادات الموجودة في نبذة حول حسابات تخزين Azure.
اتبع هذه التوصيات عند استخدام Azure Blob Storage كمخزن نقطة تحقق:
- استخدم حاوية منفصلة لكل مجموعة مستهلكين. يمكنك استخدام نفس حساب التخزين، ولكن استخدام حاوية واحدة لكل مجموعة.
- لا تستخدم الحاوية لأي شيء آخر، ولا تستخدم حساب التخزين لأي شيء آخر.
- يجب أن يكون حساب التخزين في نفس المنطقة التي يوجد فيها التطبيق المنشور. إذا كان التطبيق محليا، فحاول اختيار أقرب منطقة ممكنة.
في صفحة Storage account في مدخل Microsoft Azure، في قسم Blob service ، تأكد من تعطيل الإعدادات التالية.
- مساحة الاسم الهرمية
- حذف مبدئي لكائن ثنائي كبير الحجم
- تعيين الإصدار
حزم Go
لتلقي الرسائل، احصل على حزم Go لمراكز الأحداث كما هو موضح في المثال التالي.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
التعليمات البرمجية لتلقي الأحداث من مركز أحداث
إليك التعليمات البرمجية لتلقي الأحداث من مركز أحداث. الخطوات الرئيسية في التعليمات البرمجية هي:
- تحقق من كائن مخزن نقاط التحقق الذي يمثل Azure Blob Storage المستخدم من قبل مركز الأحداث لنقاط التفتيش.
- إنشاء عميل مستهلك لمراكز الأحداث باستخدام سلسلة الاتصال إلى مساحة اسم مراكز الأحداث واسم مركز الأحداث.
- إنشاء معالج حدث باستخدام كائن العميل وعنصر مخزن نقطة التحقق. يتلقى المعالج الأحداث ويعالجها.
- لكل قسم في مركز الحدث، قم بإنشاء عميل قسم مع processEvents كدالة لمعالجة الأحداث.
- تشغيل جميع عملاء القسم لتلقي الأحداث ومعالجتها.
هام
استبدل قيم العنصر النائب التالية بالقيم الفعلية:
AZURE STORAGE CONNECTION STRING
مع سلسلة الاتصال لحساب تخزين Azure الخاص بكBLOB CONTAINER NAME
باسم حاوية الكائن الثنائي كبير الحجم التي قمت بإنشائها في حساب التخزينNAMESPACE CONNECTION STRING
مع سلسلة الاتصال لمساحة اسم مراكز الأحداثEVENT HUB NAME
مع اسم مركز الحدث في نموذج التعليمات البرمجية.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
تشغيل تطبيقات المستلم والمرسل
قم بتشغيل تطبيق المتلقي أولا.
تشغيل تطبيق المرسل.
انتظر دقيقة لمشاهدة الإخراج التالي في نافذة المتلقي.
Processing 2 event(s) Event received with body hello Event received with body world
الخطوات التالية
راجع العينات على GitHub في https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.