مشاركة عبر


التشغيل السريع: إرسال الأحداث إلى مراكز الأحداث أو تلقيها منها باستخدام Python

في هذا التشغيل السريع، ستتعلم كيفية إرسال الأحداث وتلقيها من مركز أحداث باستخدام حزمة Azure-eventhub Python.

المتطلبات الأساسية

إذا كنت مستخدماً جديداً لـ Azure Event Hubs، فراجع نظرة عامة على Event Hubs قبل إجراء هذا التشغيل السريع.

لإكمال هذا التشغيل السريع، تأكد من أن لديك المتطلبات الأساسية التالية:

  • اشتراك Microsoft Azure: قم بالتسجيل للحصول على نسخة تجريبية مجانية إذا لم يكن لديك واحد.
  • Python 3.8 أو أحدث: تأكد من تثبيت pip وتحديثه.
  • Visual Studio Code (مستحسن): أو استخدم أي IDE آخر من اختيارك.
  • مساحة اسم مراكز الأحداث ومركز الأحداث: اتبع هذا الدليل لإنشائها في مدخل Microsoft Azure.

تثبيت الحزم لإرسال الأحداث

لتثبيت حزم Python لمراكز الأحداث، افتح موجه أوامر يحتوي على Python في مساره. قم بتغيير الدليل إلى المجلد حيث تريد الاحتفاظ بالعينات الخاصة بك.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

مصادقة التطبيق إلى Azure

يوضح لك هذا التشغيل السريع طريقتين للاتصال بمراكز أحداث Azure:

  • بدون كلمة مرور. استخدم أساس الأمان في معرف Microsoft Entra والتحكم في الوصول المستند إلى الدور (RBAC) للاتصال بمساحة اسم مراكز الأحداث. لا داعي للقلق بشأن وجود سلاسل اتصال ذات تعليمات برمجية مضمنة في التعليمات البرمجية الخاصة بك أو في ملف تكوين أو في تخزين آمن مثل Azure Key Vault.
  • سلسلة الاتصال. استخدم سلسلة اتصال للاتصال بمساحة اسم مراكز الأحداث. إذا كنت جديدا على Azure، فقد تجد خيار سلسلة الاتصال أسهل في المتابعة.

نوصي باستخدام الخيار بدون كلمة مرور في التطبيقات وبيئات الإنتاج في العالم الحقيقي. لمزيد من المعلومات، راجع مصادقة ناقل خدمة Microsoft Azure والتخويل والاتصالات بدون كلمة مرور لخدمات Azure.

تعيين أدوار لمستخدم Microsoft Entra

عند التطوير محليا، تأكد من أن حساب المستخدم الذي يتصل بمراكز أحداث Azure لديه الأذونات الصحيحة. تحتاج إلى دور مالك بيانات Azure Event Hubs لإرسال الرسائل وتلقيها. لتعيين هذا الدور لنفسك، تحتاج إلى دور مسؤول وصول المستخدم، أو دور آخر يتضمن Microsoft.Authorization/roleAssignments/write الإجراء. يمكنك تعيين أدوار Azure RBAC لمستخدم باستخدام مدخل Microsoft Azure أو Azure CLI أو Azure PowerShell. لمزيد من المعلومات، راجع فهم نطاق صفحة Azure RBAC .

يعين Azure Event Hubs Data Owner المثال التالي الدور إلى حساب المستخدم الخاص بك، والذي يوفر الوصول الكامل إلى موارد مراكز الأحداث Azure. في سيناريو حقيقي، اتبع مبدأ الامتياز الأقل لمنح المستخدمين الحد الأدنى فقط من الأذونات اللازمة لبيئة إنتاج أكثر أمانا.

أدوار Azure مضمنة لمراكز أحداث Azure

بالنسبة لمراكز أحداث Azure، فإن إدارة مساحات الأسماء وجميع الموارد ذات الصلة من خلال مدخل Microsoft Azure وواجهة برمجة تطبيقات إدارة موارد Azure محمية بالفعل باستخدام نموذج Azure RBAC. يوفر Azure الأدوار المضمنة التالية لتخويل الوصول إلى مساحة اسم مراكز الأحداث:

  • مالك بيانات مراكز الأحداث: يمكن الوصول إلى البيانات إلى مساحة اسم مراكز الأحداث وكياناتها (قوائم الانتظار والموضوعات والاشتراكات وعوامل التصفية).
  • مرسل بيانات Azure Event Hubs: استخدم هذا الدور لمنح المرسل حق الوصول إلى مساحة اسم مراكز الأحداث وكياناتها.
  • Azure Event Hubs Data Receiver: استخدم هذا الدور لمنح المتلقي حق الوصول إلى مساحة اسم مراكز الأحداث وكياناتها.

إذا كنت ترغب في إنشاء دور مخصص، فشاهد الحقوق المطلوبة لعمليات مراكز الأحداث.

هام

في معظم الحالات، يستغرق نشر تعيين الدور في Azure دقيقة أو دقيقتين. في حالات نادرة، قد يستغرق الأمر ما يصل إلى ثماني دقائق. إذا تلقيت أخطاء المصادقة عند تشغيل التعليمات البرمجية لأول مرة، فانتظر بضع لحظات وحاول مرة أخرى.

  1. في مدخل Microsoft Azure، حدد موقع مساحة اسم Event Hubs باستخدام شريط البحث الرئيسي أو التنقل الأيسر.

  2. في صفحة النظرة العامة، حدد Access control (IAM) من القائمة اليسرى.

  3. حدد صفحة التحكم بالوصول (IAM)، وحدد علامة تبويب تعيينات الدور.

  4. حدد + إضافة من القائمة العلوية. ثم حدد إضافة تعيين دور.

    لقطة شاشة توضح كيفية تعيين دور.

  5. استخدم مربع البحث لتصفية النتائج إلى الدور المطلوب. في هذا المثال، ابحث Azure Event Hubs Data Owner عن النتيجة المطابقة وحددها. ثم اختر التالي.

  6. ضمن تعيين الوصول إلى، حدد المستخدم أو المجموعة أو كيان الخدمة. ثم اختر + Select members.

  7. في مربع الحوار، ابحث عن اسم مستخدم Microsoft Entra (عادة ما يكون عنوان بريدك الإلكتروني user@domain). اختر تحديد في أسفل مربع الحوار.

  8. حدد Review + assign للانتقال إلى الصفحة الأخيرة. حدد Review + assign مرة أخرى لإكمال العملية.

إرسال الأحداث

في هذا القسم، قم بإنشاء برنامج نصي Python لإرسال الأحداث إلى مركز الأحداث الذي قمت بإنشائه سابقا.

  1. افتح محرر Python المفضل لديك، مثل Visual Studio Code.

  2. أنشئ برنامجًا نصيًّا باسم send.py. يرسل هذا البرنامج النصي دفعة من الأحداث إلى مركز الحدث الذي قمت بإنشائه سابقًا.

  3. ألصق التعليمات البرمجية التالية في send.py.

    في التعليمات البرمجية، استخدم القيم الحقيقية لاستبدال العناصر النائبة التالية:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - ترى الاسم المؤهل بالكامل في صفحة Overview لمساحة الاسم. يجب أن يكون بالتنسيق: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - اسم مركز الحدث.
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        print("Producer client created successfully.") 
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    إشعار

    للحصول على أمثلة لخيارات أخرى لإرسال الأحداث إلى مركز أحداث بشكل غير متزامن باستخدام سلسلة اتصال، راجع صفحة send_async.py GitHub. الأنماط المعروضة هناك قابلة للتطبيق أيضا على إرسال الأحداث بدون كلمة مرور.

استقبال الأحداث

يستخدم quickstart هذا مخزن Azure Blob كمخزن نقطة تحقق. يتم استخدام مخزن نقطة التحقق لاستمرار نقاط التحقق (أي مواقف القراءة الأخيرة).

اتبع هذه التوصيات عند استخدام Azure Blob Storage كمخزن نقطة تحقق:

  • استخدم حاوية منفصلة لكل مجموعة مستهلكين. يمكنك استخدام نفس حساب التخزين، ولكن استخدام حاوية واحدة لكل مجموعة.
  • لا تستخدم حساب التخزين لأي شيء آخر.
  • لا تستخدم الحاوية لأي شيء آخر.
  • إنشاء حساب التخزين في نفس المنطقة مثل التطبيق المنشور. إذا كان التطبيق محليا، فحاول اختيار أقرب منطقة ممكنة.

في صفحة Storage account في مدخل Microsoft Azure، في قسم Blob service ، تأكد من تعطيل الإعدادات التالية.

  • مساحة الاسم الهرمية
  • حذف مبدئي لكائن ثنائي كبير الحجم
  • تعيين الإصدار

إنشاء حساب مخزن Azure وحاوية الكائنات الثنائية كبيرة الحجم

قم بإنشاء حساب تخزين Azure و حاوية الكائنات الثنائية كبيرة الحجم فيه عن طريق القيام بالخطوات التالية:

  1. أنشئ حساب Azure Storage
  2. إنشاء حاوية كائن ثنائي كبير الحجم.
  3. المصادقة على حاوية الكائن الثنائي كبير الحجم.

تأكد من تسجيل سلسلة الاتصال واسم الحاوية لاستخدامها لاحقًا في التعليمات البرمجية الخاصة بالاستلام.

عند التطوير محليا، تأكد من أن حساب المستخدم الذي يصل إلى بيانات الكائن الثنائي كبير الحجم لديه الأذونات الصحيحة. تحتاج إلى Storage Blob Data Contributor لقراءة بيانات الكائن الثنائي كبير الحجم وكتابتها. لتعيين هذا الدور لنفسك، يجب تعيين دور مسؤول وصول المستخدم ، أو دور آخر يتضمن إجراء Microsoft.Authorization/roleAssignments/write . يمكنك تعيين أدوار Azure RBAC لمستخدم باستخدام مدخل Microsoft Azure أو Azure CLI أو Azure PowerShell. لمزيد من المعلومات، راجع فهم نطاق التحكم في الوصول استنادًا إلى الدور من Azure.

في هذا السيناريو، يمكنك تعيين أذونات لحساب المستخدم الخاص بك، محددة النطاق لحساب التخزين، لاتباع مبدأ الامتياز الأقل. تمنح هذه الممارسة المستخدمين الحد الأدنى من الأذونات المطلوبة فقط وتنشئ بيئات تشغيل أكثر أمانًا.

يعين المثال التالي دور Storage Blob Data Contributor لحساب المستخدم الخاص بك، والذي يوفر حق الوصول للقراءة والكتابة إلى بيانات الكائن الثنائي كبير الحجم في حساب التخزين الخاص بك.

هام

في معظم الحالات، يستغرق نشر تعيين الدور في Azure دقيقة أو دقيقتين. في حالات نادرة، قد يستغرق الأمر ما يصل إلى ثماني دقائق. إذا تلقيت أخطاء المصادقة عند تشغيل التعليمات البرمجية لأول مرة، فانتظر بضع لحظات وحاول مرة أخرى.

  1. في مدخل Microsoft Azure، حدد موقع حساب التخزين الخاص بك باستخدام شريط البحث الرئيسي أو شريط التنقل الأيسر.

  2. في صفحة حساب التخزين، حدد Access control (IAM) من القائمة اليسرى.

  3. حدد صفحة التحكم بالوصول (IAM)، وحدد علامة تبويب تعيينات الدور.

  4. حدد + إضافة من القائمة العلوية. ثم حدد إضافة تعيين دور.

    لقطة شاشة توضح كيفية تعيين دور حساب تخزين.

  5. استخدم مربع البحث لتصفية النتائج إلى الدور المطلوب. على سبيل المثال، ابحث عن Storage Blob Data Contributor. حدد النتيجة المطابقة ثم اختر التالي.

  6. ضمن تعيين الوصول إلى، حدد المستخدم أو المجموعة أو كيان الخدمة، ثم اختر + تحديد الأعضاء.

  7. في مربع الحوار، ابحث عن اسم مستخدم Microsoft Entra (عنوان بريدك الإلكتروني user@domain عادة) ثم اختر تحديد في أسفل مربع الحوار.

  8. حدد Review + assign للانتقال إلى الصفحة الأخيرة. حدد Review + assign مرة أخرى لإكمال العملية.

تثبيت الحزم لتلقي الأحداث

بالنسبة للجانب المتلقي، تحتاج إلى تثبيت حزمة واحدة أو أكثر. في هذا التشغيل السريع، يمكنك استخدام تخزين Azure Blob لاستمرار نقاط التحقق بحيث لا يقرأ البرنامج الأحداث التي يقرأها بالفعل. ينفذ نقاط فحص بيانات التعريف على الرسائل المستلمة على فترات منتظمة في النقطة. يسهّل هذا الأسلوب متابعة تلقي الرسائل لاحقًا من حيث توقفت.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

قم بإنشاء برنامج Python النصي لتلقي الأحداث

في هذا المقطع، يمكنك إنشاء برنامج Python النصي لتلقي الأحداث من مركز الأحداث الخاص بك:

  1. افتح محرر Python المفضل لديك، مثل Visual Studio Code.

  2. قم بإنشاء برنامج نصي باسم recv.py.

  3. ألصق التعليمات البرمجية التالية في recv.py.

    في التعليمات البرمجية، استخدم القيم الحقيقية لاستبدال العناصر النائبة التالية:

    • BLOB_STORAGE_ACCOUNT_URL - يجب أن تكون هذه القيمة بالتنسيق: https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
    • BLOB_CONTAINER_NAME - اسم حاوية الكائن الثنائي كبير الحجم في حساب تخزين Azure.
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - ترى الاسم المؤهل بالكامل في صفحة Overview لمساحة الاسم. يجب أن يكون بالتنسيق: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - اسم مركز الحدث.
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    إشعار

    للحصول على أمثلة لخيارات أخرى لتلقي الأحداث من مركز أحداث بشكل غير متزامن باستخدام سلسلة اتصال، راجع صفحة recv_with_checkpoint_store_async.py GitHub. الأنماط المعروضة هناك قابلة للتطبيق أيضا على تلقي الأحداث بدون كلمة مرور.

استخدم تطبيق جهاز الاستقبال.

  1. قم بتشغيل موجه الأوامر.

  2. قم بتشغيل الأمر التالي وسجل الدخول باستخدام الحساب الذي تمت إضافته إلى دور مالك بيانات مراكز الأحداث في مساحة اسم مراكز الأحداث ودور مساهم بيانات التخزين الثنائي كبير الحجم على حساب تخزين Azure.

    az login
    
  3. قم بالتبديل إلى المجلد الذي يحتوي على ملف receive.py، ثم قم بتشغيل الأمر التالي:

    python recv.py
    

قم بتشغيل تطبيق المرسل

  1. قم بتشغيل موجه الأوامر.

  2. قم بتشغيل الأمر التالي وسجل الدخول باستخدام الحساب الذي تمت إضافته إلى دور مالك بيانات مراكز الأحداث في مساحة اسم مراكز الأحداث ودور مساهم بيانات التخزين الثنائي كبير الحجم على حساب تخزين Azure.

    az login
    
  3. قم بالتبديل إلى المجلد الذي يحتوي على send.py، ثم قم بتشغيل هذا الأمر:

    python send.py
    

يجب أن يعرض إطار المتلقي الرسائل التي تم إرسالها إلى مركز الأحداث.

استكشاف الأخطاء وإصلاحها

إذا كنت لا ترى أحداثا في نافذة المتلقي أو كانت التعليمات البرمجية تبلغ عن خطأ، فجرب تلميحات استكشاف الأخطاء وإصلاحها التالية:

  • إذا لم تشاهد نتائج من recy.py، فقم بتشغيل send.py عدة مرات.

  • إذا رأيت أخطاء حول "coroutine" عند استخدام التعليمات البرمجية بدون كلمة مرور (مع بيانات الاعتماد)، فتأكد من أنك تستخدم الاستيراد من azure.identity.aio.

  • إذا رأيت "جلسة عمل العميل غير المقفلة" مع رمز بدون كلمة مرور (مع بيانات الاعتماد)، فتأكد من إغلاق بيانات الاعتماد عند الانتهاء. لمزيد من المعلومات، راجع بيانات الاعتماد غير المتزامنة.

  • إذا رأيت أخطاء التخويل مع recv.py عند الوصول إلى التخزين، فتأكد من اتباع الخطوات الواردة في إنشاء حساب تخزين Azure وحاوية كائن ثنائي كبير الحجم وتعيين دور Storage Blob Data Contributor إلى كيان الخدمة.

  • إذا تلقيت أحداثا بمعرفات أقسام مختلفة، فمن المتوقع الحصول على هذه النتيجة. الأقسام هي آلية تنظيم البيانات التي تتعلق بالتوازي النازل والمطلوبة في التطبيقات المستهلكة. يتعلق عدد الأقسام في مركز الأحداث مباشرةً بعدد القراء المتزامنين الذي تتوقع أن يكونوا موجودين. لمزيد من المعلومات، راجع معرفة المزيد حول الأقسام.

الخطوات التالية

في هذا التشغيل السريع، قمت بإرسال الأحداث وتلقيها بشكل غير متزامن. لمعرفة كيفية إرسال الأحداث وتلقيها بشكل متزامن، انتقل إلى صفحة GitHub sync_samples.

استكشف المزيد من الأمثلة والسيناريوهات المتقدمة في مكتبة عميل Azure Event Hubs لعينات Python.