مشاركة عبر


التشغيل السريع: إرسال الأحداث إلى مراكز الأحداث أو تلقيها من Azure Event Hubs

في هذا التشغيل السريع، ستتعلم كيفية إرسال الأحداث وتلقيها من مركز أحداث Azure باستخدام حزمة Azure-messaging-eventhubs Java.

تلميح

إذا كنت تعمل مع موارد Azure Event Hubs في تطبيق Spring، نوصيك بمراعاة Spring Cloud Azure كبديل. Spring Cloud Azure هو مشروع مفتوح المصدر يوفر تكامل Spring سلس مع خدمات Azure. لمعرفة المزيد حول Spring Cloud Azure، والاطلاع على مثال باستخدام مراكز الأحداث، راجع Spring Cloud Stream مع Azure Event Hubs.

المتطلبات الأساسية

إذا كنت مستخدماً جديداً لـ Azure Event Hubs، فراجع نظرة عامة على Event Hubs قبل إجراء هذا التشغيل السريع.

تحتاج إلى المتطلبات الأساسية التالية لإكمال هذا التشغيل السريع:

  • الاشتراك في Microsoft Azure. تحتاج إلى اشتراك لاستخدام خدمات Azure، بما في ذلك مراكز الأحداث في Azure. إذا لم يكن حساب Azure متوفرًا لديك، يمكنك التسجيل للحصول على نسخة تجريبية مجانية، أو استخدام مزايا الاشتراك في MSDN عند إنشاء حساب.
  • بيئة تطوير Java. يستخدم هذا التشغيل السريع Eclipse. مطلوب Java Development Kit (JDK) مع الإصدار 8 أو أعلى.
  • أنشئ مساحة اسم لـ Event Hubs ومركز الأحداث. الخطوة الأولى هي استخدام مدخل Microsoft Azure لإنشاء مساحة اسم من نوع Event Hubs، والحصول على بيانات اعتماد الإدارة التي يحتاجها التطبيق الخاص بك للاتصال بمركز الأحداث. لإنشاء مساحة اسم ومركز أحداث، اتبع الإجراء الوارد في هذه المقالة. بعد ذلك، احصل على سلسلة الاتصال لمساحة اسم مراكز الأحداث باتباع الإرشادات الواردة في المقالة: الحصول على سلسلة الاتصال. يمكنك استخدام سلسلة الاتصال لاحقا في هذا التشغيل السريع.

إرسال الأحداث

يوضح لك هذا القسم كيفية إنشاء تطبيق Java لإرسال الأحداث إلى مركز أحداث.

إضافة مرجع إلى مكتبة Azure Event Hubs

أولا، قم بإنشاء مشروع Maven جديد لتطبيق وحدة تحكم/shell في بيئة تطوير Java المفضلة لديك. قم بتحديث pom.xml الملف كما يلي. تتوفر مكتبة عميل Java لمراكز الأحداث في مستودع Maven Central.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.20.2</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.16.1</version>
		    <scope>compile</scope>
		</dependency>

ملاحظة

تحديث الإصدار إلى أحدث إصدار تم نشره إلى مستودع Maven.

مصادقة التطبيق إلى Azure

يوضح لك هذا التشغيل السريع طريقتين للاتصال بمراكز أحداث Azure:

  • بدون كلمة مرور. استخدم أساس الأمان في معرف Microsoft Entra والتحكم في الوصول المستند إلى الدور (RBAC) للاتصال بمساحة اسم مراكز الأحداث. لا داعي للقلق بشأن وجود سلاسل اتصال ذات تعليمات برمجية مضمنة في التعليمات البرمجية الخاصة بك أو في ملف تكوين أو في تخزين آمن مثل Azure Key Vault.
  • سلسلة الاتصال. استخدم سلسلة اتصال للاتصال بمساحة اسم مراكز الأحداث. إذا كنت جديدا على Azure، فقد تجد خيار سلسلة الاتصال أسهل في المتابعة.

نوصي باستخدام الخيار بدون كلمة مرور في التطبيقات وبيئات الإنتاج في العالم الحقيقي. لمزيد من المعلومات، راجع مصادقة ناقل خدمة Microsoft Azure والتخويل والاتصالات بدون كلمة مرور لخدمات Azure.

تعيين أدوار لمستخدم Microsoft Entra

عند التطوير محليا، تأكد من أن حساب المستخدم الذي يتصل بمراكز أحداث Azure لديه الأذونات الصحيحة. تحتاج إلى دور مالك بيانات Azure Event Hubs لإرسال الرسائل وتلقيها. لتعيين هذا الدور لنفسك، تحتاج إلى دور مسؤول وصول المستخدم، أو دور آخر يتضمن Microsoft.Authorization/roleAssignments/write الإجراء. يمكنك تعيين أدوار Azure RBAC لمستخدم باستخدام مدخل Microsoft Azure أو Azure CLI أو Azure PowerShell. لمزيد من المعلومات، راجع فهم نطاق صفحة Azure RBAC .

يعين Azure Event Hubs Data Owner المثال التالي الدور إلى حساب المستخدم الخاص بك، والذي يوفر الوصول الكامل إلى موارد مراكز الأحداث Azure. في سيناريو حقيقي، اتبع مبدأ الامتياز الأقل لمنح المستخدمين الحد الأدنى فقط من الأذونات اللازمة لبيئة إنتاج أكثر أمانا.

أدوار Azure مضمنة لمراكز أحداث Azure

بالنسبة لمراكز أحداث Azure، فإن إدارة مساحات الأسماء وجميع الموارد ذات الصلة من خلال مدخل Microsoft Azure وواجهة برمجة تطبيقات إدارة موارد Azure محمية بالفعل باستخدام نموذج Azure RBAC. يوفر Azure الأدوار المضمنة التالية لتخويل الوصول إلى مساحة اسم مراكز الأحداث:

  • مالك بيانات مراكز الأحداث: يمكن الوصول إلى البيانات إلى مساحة اسم مراكز الأحداث وكياناتها (قوائم الانتظار والموضوعات والاشتراكات وعوامل التصفية).
  • مرسل بيانات Azure Event Hubs: استخدم هذا الدور لمنح المرسل حق الوصول إلى مساحة اسم مراكز الأحداث وكياناتها.
  • Azure Event Hubs Data Receiver: استخدم هذا الدور لمنح المتلقي حق الوصول إلى مساحة اسم مراكز الأحداث وكياناتها.

إذا كنت ترغب في إنشاء دور مخصص، فشاهد الحقوق المطلوبة لعمليات مراكز الأحداث.

مهم

في معظم الحالات، يستغرق نشر تعيين الدور في Azure دقيقة أو دقيقتين. في حالات نادرة، قد يستغرق الأمر ما يصل إلى ثماني دقائق. إذا تلقيت أخطاء المصادقة عند تشغيل التعليمات البرمجية لأول مرة، فانتظر بضع لحظات وحاول مرة أخرى.

  1. في مدخل Microsoft Azure، حدد موقع مساحة اسم Event Hubs باستخدام شريط البحث الرئيسي أو التنقل الأيسر.

  2. في صفحة النظرة العامة، حدد Access control (IAM) من القائمة اليسرى.

  3. حدد صفحة التحكم بالوصول (IAM)، وحدد علامة تبويب تعيينات الدور.

  4. حدد + إضافة من القائمة العلوية. ثم حدد إضافة تعيين دور.

    لقطة شاشة توضح كيفية تعيين دور.

  5. استخدم مربع البحث لتصفية النتائج إلى الدور المطلوب. في هذا المثال، ابحث Azure Event Hubs Data Owner عن النتيجة المطابقة وحددها. ثم اختر التالي.

  6. ضمن تعيين الوصول إلى، حدد المستخدم أو المجموعة أو كيان الخدمة. ثم اختر + Select members.

  7. في مربع الحوار، ابحث عن اسم مستخدم Microsoft Entra (عادة ما يكون عنوان بريدك الإلكتروني user@domain). اختر تحديد في أسفل مربع الحوار.

  8. حدد Review + assign للانتقال إلى الصفحة الأخيرة. حدد Review + assign مرة أخرى لإكمال العملية.

كتابة التعليمات البرمجية لإرسال رسائل إلى مركز الأحداث

أضف فئة باسم Sender، وأضف التعليمات البرمجية التالية إلى الفئة:

مهم

  • قم بالتحديث <NAMESPACE NAME> باسم مساحة اسم Event Hubs.
  • قم بالتحديث <EVENT HUB NAME> باسم مركز الأحداث الخاص بك.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

أنشئ البرنامج، وتأكد من عدم وجود أخطاء. ستقوم بتشغيل هذا البرنامج بعد تشغيل برنامج المتلقي.

استقبال الأحداث

تستند التعليمات البرمجية في هذا البرنامج التعليمي إلى نموذج EventProcessorClient على GitHub، والذي يمكنك فحصه لرؤية تطبيق العمل الكامل.

اتبع هذه التوصيات عند استخدام Azure Blob Storage كمخزن نقطة تحقق:

  • استخدم حاوية منفصلة لكل مجموعة مستهلكين. يمكنك استخدام نفس حساب التخزين، ولكن استخدام حاوية واحدة لكل مجموعة.
  • لا تستخدم حساب التخزين لأي شيء آخر.
  • لا تستخدم الحاوية لأي شيء آخر.
  • إنشاء حساب التخزين في نفس المنطقة مثل التطبيق المنشور. إذا كان التطبيق محليا، فحاول اختيار أقرب منطقة ممكنة.

في صفحة Storage account في مدخل Microsoft Azure، في قسم Blob service ، تأكد من تعطيل الإعدادات التالية.

  • مساحة الاسم الهرمية
  • حذف مبدئي لكائن ثنائي كبير الحجم
  • تعيين الإصدار

إنشاء Azure Storage وحاوية كائن ثنائي كبير الحجم

في هذا التشغيل السريع، يمكنك استخدام Azure Storage (على وجه التحديد، Blob Storage) كمخزن نقاط التحقق. نقاط التفتيش هي عملية يقوم من خلالها معالج الأحداث بوضع علامة على موضع آخر حدث تمت معالجته بنجاح داخل قسم أو تثبيته. عادة ما يتم وضع علامة على نقطة تحقق داخل الدالة التي تعالج الأحداث. لمعرفة المزيد حول نقاط التحقق، راجع معالج الأحداث.

اتبع هذه الخطوات لإنشاء حساب Azure Storage.

  1. إنشاء حساب Azure Storage
  2. إنشاء حاوية كائن ثنائي كبير الحجم
  3. المصادقة على حاوية الكائن الثنائي كبير الحجم

عند التطوير محليا، تأكد من أن حساب المستخدم الذي يصل إلى بيانات الكائن الثنائي كبير الحجم لديه الأذونات الصحيحة. تحتاج إلى Storage Blob Data Contributor لقراءة بيانات الكائن الثنائي كبير الحجم وكتابتها. لتعيين هذا الدور لنفسك، يجب تعيين دور مسؤول وصول المستخدم ، أو دور آخر يتضمن إجراء Microsoft.Authorization/roleAssignments/write . يمكنك تعيين أدوار Azure RBAC لمستخدم باستخدام مدخل Microsoft Azure أو Azure CLI أو Azure PowerShell. لمزيد من المعلومات، راجع فهم نطاق التحكم في الوصول استنادًا إلى الدور من Azure.

في هذا السيناريو، يمكنك تعيين أذونات لحساب المستخدم الخاص بك، محددة النطاق لحساب التخزين، لاتباع مبدأ الامتياز الأقل. تمنح هذه الممارسة المستخدمين الحد الأدنى من الأذونات المطلوبة فقط وتنشئ بيئات تشغيل أكثر أمانًا.

يعين المثال التالي دور Storage Blob Data Contributor لحساب المستخدم الخاص بك، والذي يوفر حق الوصول للقراءة والكتابة إلى بيانات الكائن الثنائي كبير الحجم في حساب التخزين الخاص بك.

مهم

في معظم الحالات، يستغرق نشر تعيين الدور في Azure دقيقة أو دقيقتين. في حالات نادرة، قد يستغرق الأمر ما يصل إلى ثماني دقائق. إذا تلقيت أخطاء المصادقة عند تشغيل التعليمات البرمجية لأول مرة، فانتظر بضع لحظات وحاول مرة أخرى.

  1. في مدخل Microsoft Azure، حدد موقع حساب التخزين الخاص بك باستخدام شريط البحث الرئيسي أو شريط التنقل الأيسر.

  2. في صفحة حساب التخزين، حدد Access control (IAM) من القائمة اليسرى.

  3. حدد صفحة التحكم بالوصول (IAM)، وحدد علامة تبويب تعيينات الدور.

  4. حدد + إضافة من القائمة العلوية. ثم حدد إضافة تعيين دور.

    لقطة شاشة توضح كيفية تعيين دور حساب تخزين.

  5. استخدم مربع البحث لتصفية النتائج إلى الدور المطلوب. على سبيل المثال، ابحث عن Storage Blob Data Contributor. حدد النتيجة المطابقة ثم اختر التالي.

  6. ضمن تعيين الوصول إلى، حدد المستخدم أو المجموعة أو كيان الخدمة، ثم اختر + تحديد الأعضاء.

  7. في مربع الحوار، ابحث عن اسم مستخدم Microsoft Entra (عنوان بريدك الإلكتروني user@domain عادة) ثم اختر تحديد في أسفل مربع الحوار.

  8. حدد Review + assign للانتقال إلى الصفحة الأخيرة. حدد Review + assign مرة أخرى لإكمال العملية.

إضافة مكتبات مراكز الأحداث إلى مشروع Java الخاص بك

أضف التبعيات التالية في ملف pom.xml.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.20.2</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.20.6</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.16.1</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. أضف العبارات التالية import في أعلى ملف Java.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. أنشئ فئة باسم Receiver، وأضف متغيرات السلسلة التالية إلى الفئة . استبدل العناصر النائبة بالقيم الصحيحة.

    مهم

    استبدل العناصر النائبة بالقيم الصحيحة.

    • <NAMESPACE NAME> باسم مساحة اسم Event Hubs.
    • <EVENT HUB NAME> مع اسم مركز الحدث الخاص بك في مساحة الاسم.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. أضف الأسلوب التالي main إلى الفئة .

    مهم

    استبدل العناصر النائبة بالقيم الصحيحة.

    • <STORAGE ACCOUNT NAME> باسم حساب Azure Storage الخاص بك.
    • <CONTAINER NAME> باسم حاوية الكائن الثنائي كبير الحجم في حساب التخزين
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. أضف الأسلوبين المساعدين (PARTITION_PROCESSOR و ERROR_HANDLER) اللذين يعالجان الأحداث والأخطاء إلى Receiver الفئة .

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. أنشئ البرنامج، وتأكد من عدم وجود أخطاء.

تشغيل التطبيقات

  1. قم بتشغيل تطبيق المتلقي أولا.

  2. ثم قم بتشغيل تطبيق المرسل .

  3. في نافذة تطبيق المتلقي ، تأكد من رؤية الأحداث التي تم نشرها بواسطة تطبيق المرسل.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. اضغط على ENTER في نافذة تطبيق المتلقي لإيقاف التطبيق.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

راجع العينات التالية على GitHub: