EventHubConsumerClient الفصل

تحدد فئة EventHubConsumerClient واجهة عالية المستوى لتلقي الأحداث من خدمة Azure Event Hubs.

الهدف الرئيسي من EventHubConsumerClient هو تلقي الأحداث من جميع أقسام EventHub مع موازنة التحميل ونقاط التفتيش.

عند تشغيل مثيلات EventHubConsumerClient متعددة مقابل نفس مركز الحدث ومجموعة المستهلكين وموقع نقاط التحقق، سيتم توزيع الأقسام بالتساوي بينها.

لتمكين موازنة التحميل ونقاط التحقق المستمرة، يجب تعيين checkpoint_store عند إنشاء EventHubConsumerClient. إذا لم يتم توفير مخزن نقطة تحقق، فسيتم الاحتفاظ بنقطة التحقق داخليا في الذاكرة.

يمكن أن يتلقى EventHubConsumerClient أيضا من قسم معين عند استدعاء أسلوبه receive() أو receive_batch() وتحديد partition_id. لن تعمل موازنة التحميل في وضع القسم الفردي. ولكن لا يزال بإمكان المستخدمين حفظ نقاط التحقق إذا تم تعيين checkpoint_store.

توريث
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

الدالمنشئ

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

المعلمات

fully_qualified_namespace
str
مطلوب

اسم المضيف المؤهل بالكامل لمساحة اسم مراكز الأحداث. تنسيق مساحة الاسم هو: .servicebus.windows.net.

eventhub_name
str
مطلوب

مسار Event Hub المحدد لتوصيل العميل به.

consumer_group
str
مطلوب

تلقي الأحداث من مركز الأحداث لمجموعة المستهلكين هذه.

credential
AsyncTokenCredential أو AzureSasCredential أو AzureNamedKeyCredential
مطلوب

عنصر بيانات الاعتماد المستخدم للمصادقة الذي ينفذ واجهة معينة للحصول على الرموز المميزة. يقبل EventHubSharedKeyCredentialكائنات بيانات الاعتماد أو التي تم إنشاؤها بواسطة مكتبة هوية azure والعناصر التي تنفذ أسلوب *get_token(self, scopes).

logging_enable
bool

ما إذا كان يجب إخراج سجلات تتبع الشبكة إلى المسجل. الافتراضي هو False.

auth_timeout
float

الوقت بالثوان للانتظار حتى يتم تخويل رمز مميز من قبل الخدمة. القيمة الافتراضية هي 60 ثانية. إذا تم تعيينه إلى 0، فلن يتم فرض أي مهلة من العميل.

user_agent
str

إذا تم تحديده، فستتم إضافة هذا أمام سلسلة عامل المستخدم.

retry_total
int

العدد الإجمالي لمحاولات إعادة عملية فاشلة عند حدوث خطأ. القيمة الافتراضية هي: 3. سياق retry_total في الاستلام خاص: يتم تنفيذ أسلوب الاستلام بواسطة أسلوب تلقي داخلي للاتصال أثناء التكرار الحلقي في كل تكرار. في حالة الاستلام ، يحدد retry_total أرقام إعادة المحاولة بعد الخطأ الذي تم رفعه بواسطة أسلوب الاستلام الداخلي في التكرار الحلقي أثناء. إذا تم استنفاد محاولات إعادة المحاولة، فسيتم استدعاء رد الاتصال on_error (إذا تم توفيره) بمعلومات الخطأ. سيتم إغلاق مستهلك القسم الداخلي الفاشل (سيتم استدعاء on_partition_close إذا تم توفيره) وسيتم إنشاء مستهلك قسم داخلي جديد (on_partition_initialize سيتم استدعاؤه إذا تم توفيره) لاستئناف الاستلام.

retry_backoff_factor
float

عامل تراجع لتطبيقه بين المحاولات بعد المحاولة الثانية (يتم حل معظم الأخطاء على الفور عن طريق محاولة ثانية دون تأخير). في الوضع الثابت، سينام نهج إعادة المحاولة دائما ل {عامل التراجع}. في الوضع "الأسي"، سينام نهج إعادة المحاولة ل: {عامل التراجع} * (2 ** ({عدد إجمالي عمليات إعادة المحاولة} - 1)) ثانية. إذا كان backoff_factor هو 0.1، فستنام إعادة المحاولة ل [0.0s، 0.2s، 0.4s، ...] بين عمليات إعادة المحاولة. القيمة الافتراضية هي 0.8.

retry_backoff_max
float

الحد الأقصى لوقت التراجع. القيمة الافتراضية هي 120 ثانية (دقيقتان).

retry_mode
str

سلوك التأخير بين محاولات إعادة المحاولة. القيم المدعومة "ثابتة" أو "أسية"، حيث يكون الافتراضي "أسيا".

idle_timeout
float

المهلة، بالثوان، وبعد ذلك سيقوم هذا العميل بإغلاق الاتصال الأساسي إذا لم يكن هناك أي نشاط آخر. بشكل افتراضي، القيمة هي None، ما يعني أن العميل لن يتم إيقاف تشغيله بسبب عدم النشاط ما لم تبدأه الخدمة.

transport_type
TransportType

نوع بروتوكول النقل الذي سيتم استخدامه للاتصال بخدمة مراكز الأحداث. الافتراضي هو TransportType.Amqp في هذه الحالة يتم استخدام المنفذ 5671. إذا كان المنفذ 5671 غير متوفر/محظور في بيئة الشبكة، يمكن استخدام TransportType.AmqpOverWebsocket بدلا من ذلك الذي يستخدم المنفذ 443 للاتصال.

http_proxy

إعدادات وكيل HTTP. يجب أن يكون هذا قاموسا بالمفاتيح التالية: "proxy_hostname" (قيمة str) و "proxy_port" (قيمة int).

checkpoint_store
Optional[CheckpointStore]

مدير يخزن بيانات موازنة تحميل القسم ونقطة التحقق عند تلقي الأحداث. سيتم استخدام مخزن نقاط التحقق في كلتا الحالتين من الاستلام من جميع الأقسام أو قسم واحد. في الحالة الأخيرة، لا يتم تطبيق موازنة التحميل. إذا لم يتم توفير مخزن نقاط التحقق، فسيتم الاحتفاظ بنقطة التحقق داخليا في الذاكرة، وسيتلقى مثيل EventHubConsumerClient الأحداث دون موازنة التحميل.

load_balancing_interval
float

عند بدء موازنة التحميل. هذا هو الفاصل الزمني، بالثوان، بين تقييمين لموازنة التحميل. الافتراضي هو 30 ثانية.

partition_ownership_expiration_interval
float

ستنتهي صلاحية ملكية القسم بعد هذا العدد من الثوان. سيؤدي كل تقييم لموازنة التحميل تلقائيا إلى تمديد وقت انتهاء صلاحية الملكية. الافتراضي هو 6 * load_balancing_interval، أي 180 ثانية عند استخدام load_balancing_interval الافتراضية البالغة 30 ثانية.

load_balancing_strategy
str أو LoadBalancingStrategy

عند بدء موازنة التحميل، سيستخدم هذه الاستراتيجية للمطالبة بملكية القسم وموازنة ذلك. استخدم "الجشع" أو LoadBalancingStrategy.GREEDY للاستراتيجية الجشعة، والتي، لكل تقييم لموازنة التحميل، ستستحوذ على العديد من الأقسام غير المطالب بها المطلوبة لموازنة الحمل. استخدم "متوازن" أو LoadBalancingStrategy.BALANCED للاستراتيجية المتوازنة، والتي، لكل تقييم لموازنة التحميل، تطالب بقسم واحد فقط لا يطالب به EventHubConsumerClient آخر. إذا تم المطالبة بجميع أقسام EventHub بواسطة EventHubConsumerClient آخر وكان هذا العميل قد طالب بعدد قليل جدا من الأقسام، فسيسرق هذا العميل قسما واحدا من عملاء آخرين لكل تقييم موازنة تحميل بغض النظر عن استراتيجية موازنة التحميل. يتم استخدام استراتيجية الجشع بشكل افتراضي.

custom_endpoint_address
Optional[str]

عنوان نقطة النهاية المخصص لاستخدامه لإنشاء اتصال بخدمة مراكز الأحداث، ما يسمح بتوجيه طلبات الشبكة من خلال أي بوابات تطبيق أو مسارات أخرى مطلوبة لبيئة المضيف. الافتراضي هو بلا. سيكون التنسيق مثل "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". إذا لم يتم تحديد المنفذ في custom_endpoint_address، فسيتم استخدام المنفذ 443 بشكل افتراضي.

connection_verify
Optional[str]

المسار إلى ملف CA_BUNDLE المخصص لشهادة SSL المستخدمة لمصادقة هوية نقطة نهاية الاتصال. الافتراضي هو None وفي هذه الحالة سيتم استخدام certifi.where() .

uamqp_transport
bool

ما إذا كنت تريد استخدام مكتبة uamqp كنقل أساسي. القيمة الافتراضية هي False وسيتم استخدام مكتبة Pure Python AMQP كنقل أساسي.

socket_timeout
float

الوقت بالثوان الذي يجب أن ينتظره مأخذ التوصيل الأساسي على الاتصال عند إرسال البيانات وتلقيها قبل انتهاء المهلة. القيمة الافتراضية هي 0.2 ل TransportType.Amqp و1 ل TransportType.AmqpOverWebsocket. إذا حدثت أخطاء EventHubsConnectionError بسبب انتهاء مهلة الكتابة، فقد تحتاج قيمة أكبر من القيمة الافتراضية إلى تمريرها. هذا لسيناريوهات الاستخدام المتقدمة وعادة ما يجب أن تكون القيمة الافتراضية كافية.

أمثلة

إنشاء مثيل جديد من EventHubConsumerClient.


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     consumer_group='$Default',
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

الأساليب

close

توقف عن استرداد الأحداث من Event Hub وأغلق اتصال AMQP الأساسي والارتباطات.

from_connection_string

إنشاء EventHubConsumerClient من سلسلة الاتصال.

get_eventhub_properties

احصل على خصائص Event Hub.

تتضمن المفاتيح الموجودة في القاموس الذي تم إرجاعه ما يلي:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

احصل على معرفات القسم لمركز الأحداث.

get_partition_properties

احصل على خصائص القسم المحدد.

تتضمن المفاتيح الموجودة في قاموس الخصائص ما يلي:

  • eventhub_name (str)

  • المعرف (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

تلقي الأحداث من القسم (الأقسام)، مع موازنة التحميل الاختيارية ونقاط التفتيش.

receive_batch

تلقي الأحداث من الأقسام (الأقسام) على دفعات، مع موازنة التحميل الاختيارية ونقاط التحقق.

close

توقف عن استرداد الأحداث من Event Hub وأغلق اتصال AMQP الأساسي والارتباطات.

async close() -> None

نوع الإرجاع

أمثلة

أغلق العميل.


   import os

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub.aio import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

إنشاء EventHubConsumerClient من سلسلة الاتصال.

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient

المعلمات

conn_str
str
مطلوب

سلسلة الاتصال مركز الأحداث.

consumer_group
str
مطلوب

تلقي الأحداث من Event Hub لمجموعة المستهلكين هذه.

eventhub_name
str

مسار Event Hub المحدد لتوصيل العميل به.

logging_enable
bool

ما إذا كان يجب إخراج سجلات تتبع الشبكة إلى المسجل. الافتراضي هو False.

http_proxy
dict

إعدادات وكيل HTTP. يجب أن يكون هذا قاموسا بالمفاتيح التالية: "proxy_hostname" (قيمة str) و "proxy_port" (قيمة int). بالإضافة إلى ذلك، قد تكون المفاتيح التالية موجودة أيضا: "اسم المستخدم"، "كلمة المرور".

auth_timeout
float

الوقت بالثوان للانتظار حتى يتم تخويل رمز مميز من قبل الخدمة. القيمة الافتراضية هي 60 ثانية. إذا تم تعيينه إلى 0، فلن يتم فرض أي مهلة من العميل.

user_agent
str

إذا تم تحديده، فستتم إضافة هذا أمام سلسلة عامل المستخدم.

retry_total
int

العدد الإجمالي لمحاولات إعادة عملية فاشلة عند حدوث خطأ. القيمة الافتراضية هي: 3. سياق retry_total في الاستلام خاص: يتم تنفيذ أسلوب الاستلام بواسطة أسلوب تلقي داخلي للاتصال أثناء التكرار الحلقي في كل تكرار. في حالة الاستلام ، يحدد retry_total أرقام إعادة المحاولة بعد الخطأ الذي تم رفعه بواسطة أسلوب الاستلام الداخلي في التكرار الحلقي أثناء. إذا تم استنفاد محاولات إعادة المحاولة، فسيتم استدعاء رد الاتصال on_error (إذا تم توفيره) بمعلومات الخطأ. سيتم إغلاق مستهلك القسم الداخلي الفاشل (سيتم استدعاء on_partition_close إذا تم توفيره) وسيتم إنشاء مستهلك قسم داخلي جديد (on_partition_initialize سيتم استدعاؤه إذا تم توفيره) لاستئناف الاستلام.

retry_backoff_factor
float

عامل تراجع لتطبيقه بين المحاولات بعد المحاولة الثانية (يتم حل معظم الأخطاء على الفور عن طريق محاولة ثانية دون تأخير). في الوضع الثابت، سينام نهج إعادة المحاولة دائما ل {عامل التراجع}. في الوضع "الأسي"، سينام نهج إعادة المحاولة ل: {عامل التراجع} * (2 ** ({عدد إجمالي عمليات إعادة المحاولة} - 1)) ثانية. إذا كان backoff_factor هو 0.1، فستنام إعادة المحاولة ل [0.0s، 0.2s، 0.4s، ...] بين عمليات إعادة المحاولة. القيمة الافتراضية هي 0.8.

retry_backoff_max
float

الحد الأقصى لوقت التراجع. القيمة الافتراضية هي 120 ثانية (دقيقتان).

retry_mode
str

سلوك التأخير بين محاولات إعادة المحاولة. القيم المدعومة "ثابتة" أو "أسية"، حيث يكون الافتراضي "أسيا".

idle_timeout
float

المهلة، بالثوان، وبعد ذلك سيقوم هذا العميل بإغلاق الاتصال الأساسي إذا لم يكن هناك أي نشاط آخر. بشكل افتراضي، القيمة هي None، ما يعني أن العميل لن يتم إيقاف تشغيله بسبب عدم النشاط ما لم تبدأه الخدمة.

transport_type
TransportType

نوع بروتوكول النقل الذي سيتم استخدامه للاتصال بخدمة مراكز الأحداث. الافتراضي هو TransportType.Amqp في هذه الحالة يتم استخدام المنفذ 5671. إذا كان المنفذ 5671 غير متوفر/محظور في بيئة الشبكة، يمكن استخدام TransportType.AmqpOverWebsocket بدلا من ذلك الذي يستخدم المنفذ 443 للاتصال.

checkpoint_store
Optional[CheckpointStore]

مدير يخزن بيانات موازنة تحميل القسم ونقطة التحقق عند تلقي الأحداث. سيتم استخدام مخزن نقاط التحقق في كلتا الحالتين من الاستلام من جميع الأقسام أو قسم واحد. في الحالة الأخيرة، لا يتم تطبيق موازنة التحميل. إذا لم يتم توفير مخزن نقاط التحقق، فسيتم الاحتفاظ بنقطة التحقق داخليا في الذاكرة، وسيتلقى مثيل EventHubConsumerClient الأحداث دون موازنة التحميل.

load_balancing_interval
float

عند بدء موازنة التحميل. هذا هو الفاصل الزمني، بالثوان، بين تقييمين لموازنة التحميل. الافتراضي هو 30 ثانية.

partition_ownership_expiration_interval
float

ستنتهي صلاحية ملكية القسم بعد هذا العدد من الثوان. سيؤدي كل تقييم لموازنة التحميل تلقائيا إلى تمديد وقت انتهاء صلاحية الملكية. الافتراضي هو 6 * load_balancing_interval، أي 180 ثانية عند استخدام load_balancing_interval الافتراضية البالغة 30 ثانية.

load_balancing_strategy
str أو LoadBalancingStrategy

عند بدء موازنة التحميل، سيستخدم هذه الاستراتيجية للمطالبة بملكية القسم وموازنة ذلك. استخدم "الجشع" أو LoadBalancingStrategy.GREEDY للاستراتيجية الجشعة، والتي، لكل تقييم لموازنة التحميل، ستستحوذ على العديد من الأقسام غير المطالب بها المطلوبة لموازنة الحمل. استخدم "متوازن" أو LoadBalancingStrategy.BALANCED للاستراتيجية المتوازنة، والتي، لكل تقييم لموازنة التحميل، تطالب بقسم واحد فقط لا يطالب به EventHubConsumerClient آخر. إذا تم المطالبة بجميع أقسام EventHub بواسطة EventHubConsumerClient آخر وكان هذا العميل قد طالب بعدد قليل جدا من الأقسام، فسيسرق هذا العميل قسما واحدا من عملاء آخرين لكل تقييم موازنة تحميل بغض النظر عن استراتيجية موازنة التحميل. يتم استخدام استراتيجية الجشع بشكل افتراضي.

custom_endpoint_address
Optional[str]

عنوان نقطة النهاية المخصص لاستخدامه لإنشاء اتصال بخدمة مراكز الأحداث، ما يسمح بتوجيه طلبات الشبكة من خلال أي بوابات تطبيق أو مسارات أخرى مطلوبة لبيئة المضيف. الافتراضي هو بلا. سيكون التنسيق مثل "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". إذا لم يتم تحديد المنفذ في custom_endpoint_address، فسيتم استخدام المنفذ 443 بشكل افتراضي.

connection_verify
Optional[str]

المسار إلى ملف CA_BUNDLE المخصص لشهادة SSL المستخدمة لمصادقة هوية نقطة نهاية الاتصال. الافتراضي هو None وفي هذه الحالة سيتم استخدام certifi.where() .

uamqp_transport
bool

ما إذا كنت تريد استخدام مكتبة uamqp كنقل أساسي. القيمة الافتراضية هي False وسيتم استخدام مكتبة Pure Python AMQP كنقل أساسي.

نوع الإرجاع

أمثلة

إنشاء مثيل جديد من EventHubConsumerClient من سلسلة الاتصال.


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

احصل على خصائص Event Hub.

تتضمن المفاتيح الموجودة في القاموس الذي تم إرجاعه ما يلي:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

المرتجعات

قاموس يحتوي على معلومات حول Event Hub.

نوع الإرجاع

استثناءات

get_partition_ids

احصل على معرفات القسم لمركز الأحداث.

async get_partition_ids() -> List[str]

المرتجعات

قائمة بمعرفات الأقسام.

نوع الإرجاع

استثناءات

get_partition_properties

احصل على خصائص القسم المحدد.

تتضمن المفاتيح الموجودة في قاموس الخصائص ما يلي:

  • eventhub_name (str)

  • المعرف (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

المعلمات

partition_id
str
مطلوب

معرف القسم الهدف.

المرتجعات

قاموس يحتوي على خصائص القسم.

نوع الإرجاع

استثناءات

receive

تلقي الأحداث من القسم (الأقسام)، مع موازنة التحميل الاختيارية ونقاط التفتيش.

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

المعلمات

on_event
Callable[PartitionContext, Optional[EventData]]
مطلوب

دالة رد الاتصال لمعالجة حدث تم تلقيه. يأخذ رد الاتصال معلمتين: partition_context الذي يحتوي على سياق القسم والحدث الذي هو الحدث المستلم. يجب تعريف دالة رد الاتصال مثل: on_event (partition_context، الحدث). للحصول على معلومات مفصلة عن سياق القسم، يرجى الرجوع إلى PartitionContext.

max_wait_time
float

الحد الأقصى للفاصل الزمني بالثوان الذي سينتظره معالج الحدث قبل استدعاء رد الاتصال. إذا لم يتم تلقي أي أحداث خلال هذا الفاصل الزمني، فسيتم استدعاء رد اتصال on_event مع بلا. إذا تم تعيين هذه القيمة إلى بلا أو 0 (الافتراضي)، فلن يتم استدعاء رد الاتصال حتى يتم تلقي حدث.

partition_id
str

إذا تم تحديده، فسيتلقى العميل من هذا القسم فقط. وإلا سيتلقى العميل من جميع الأقسام.

owner_level
int

الأولوية للمستهلك الحصري. سيتم إنشاء مستهلك حصري إذا تم تعيين owner_level. المستهلك ذو owner_level أعلى له أولوية حصرية أعلى. يعرف مستوى المالك أيضا باسم "قيمة الفترة" للمستهلك.

prefetch
int

عدد الأحداث التي يجب إحضارها مسبقا من الخدمة للمعالجة. الافتراضي هو 300.

track_last_enqueued_event_properties
bool

يشير إلى ما إذا كان يجب على المستهلك طلب معلومات حول الحدث الأخير في قائمة الانتظار على القسم المرتبط به، وتتبع هذه المعلومات أثناء تلقي الأحداث. عند تعقب معلومات حول آخر حدث مدرج في قائمة الانتظار للأقسام، سيحمل كل حدث تم تلقيه من خدمة مراكز الأحداث بيانات تعريف حول القسم. يؤدي هذا إلى كمية صغيرة من استهلاك النطاق الترددي الإضافي للشبكة الذي يكون عموما مفاضلة مواتية عند النظر في مقابل تقديم طلبات لخصائص القسم بشكل دوري باستخدام عميل Event Hub. يتم تعيينه إلى False بشكل افتراضي.

starting_position
str, int, datetime أو dict[str,any]

ابدأ في الاستلام من موضع الحدث هذا إذا لم تكن هناك بيانات نقطة تحقق لقسم. سيتم استخدام بيانات نقطة التحقق إذا كانت متوفرة. يمكن أن يكون هذا إملاء مع معرف القسم كمفتاح وموضع كقيمة للأقسام الفردية، أو قيمة واحدة لجميع الأقسام. يمكن أن يكون نوع القيمة str أو int أو datetime.datetime. يتم دعم القيم "-1" للاستلام من بداية الدفق، و"@latest" لتلقي الأحداث الجديدة فقط.

starting_position_inclusive
bool أو dict[str,bool]

تحديد ما إذا كان starting_position المحدد شاملا (>=) أم لا (>). صواب للشمولية والخطأ للحصري. يمكن أن يكون هذا إملاء بمعرف القسم كمفتاح وقيمة كقيمة تشير إلى ما إذا كان starting_position لقسم معين شاملا أم لا. يمكن أن تكون هذه أيضا قيمة منطقية واحدة لجميع starting_position. القيمة الافتراضية هي False.

on_error
Callable[[PartitionContext, Exception]]

دالة رد الاتصال التي سيتم استدعاؤها عند ظهور خطأ أثناء الاستلام بعد استنفاد محاولات إعادة المحاولة، أو أثناء عملية موازنة التحميل. يأخذ رد الاتصال معلمتين: partition_context التي تحتوي على معلومات القسم والخطأ هو الاستثناء. partition_context يمكن أن يكون None إذا تم رفع الخطأ أثناء عملية موازنة التحميل. يجب تعريف رد الاتصال مثل: on_error (partition_context، خطأ). سيتم استدعاء رد اتصال on_error أيضا إذا تم رفع استثناء غير معالج أثناء رد اتصال on_event .

on_partition_initialize
Callable[[PartitionContext]]

تنتهي دالة رد الاتصال التي سيتم استدعاؤها بعد انتهاء المستهلك لقسم معين من التهيئة. سيتم استدعاؤه أيضا عند إنشاء مستهلك قسم داخلي جديد لتولي عملية الاستلام لمستهلك قسم داخلي فاشل ومغلق. يأخذ رد الاتصال معلمة واحدة: partition_context التي تحتوي على معلومات القسم. يجب تعريف رد الاتصال مثل: on_partition_initialize (partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

يتم إغلاق دالة رد الاتصال التي سيتم استدعاؤها بعد إغلاق المستهلك لقسم معين. سيتم استدعاؤه أيضا عند ظهور خطأ أثناء الاستلام بعد استنفاد محاولات إعادة المحاولة. يأخذ رد الاتصال معلمتين: partition_context التي تحتوي على معلومات القسم وسبب الإغلاق. يجب تعريف رد الاتصال مثل: on_partition_close (partition_context، السبب). يرجى الرجوع إلى CloseReason لأسباب الإغلاق المختلفة.

نوع الإرجاع

أمثلة

تلقي الأحداث من EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

تلقي الأحداث من الأقسام (الأقسام) على دفعات، مع موازنة التحميل الاختيارية ونقاط التحقق.

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

المعلمات

on_event_batch
Callable[PartitionContext, List[EventData]]
مطلوب

دالة رد الاتصال لمعالجة دفعة من الأحداث المستلمة. يأخذ رد الاتصال معلمتين: partition_context الذي يحتوي على سياق القسم event_batch، وهو الأحداث المستلمة. يجب تعريف دالة رد الاتصال مثل: on_event_batch (partition_context، event_batch). قد تكون event_batch قائمة فارغة إذا لم يكن max_wait_time بلا ولا 0 ولم يتم تلقي أي حدث بعد max_wait_time. للحصول على معلومات مفصلة عن سياق القسم، يرجى الرجوع إلى PartitionContext.

max_batch_size
int

الحد الأقصى لعدد الأحداث في دفعة تم تمريرها إلى on_event_batch رد الاتصال. إذا كان العدد الفعلي للأحداث المستلمة أكبر من max_batch_size، يتم تقسيم الأحداث المستلمة إلى دفعات واستدعاء رد الاتصال لكل دفعة مع ما يصل إلى max_batch_size الأحداث.

max_wait_time
float

الحد الأقصى للفاصل الزمني بالثوان الذي سينتظره معالج الحدث قبل استدعاء رد الاتصال. إذا لم يتم تلقي أي أحداث خلال هذا الفاصل الزمني، فسيتم استدعاء رد اتصال on_event_batch بقائمة فارغة. إذا تم تعيين هذه القيمة إلى بلا أو 0 (الافتراضي)، فلن يتم استدعاء رد الاتصال حتى يتم تلقي الأحداث.

partition_id
str

إذا تم تحديده، فسيتلقى العميل من هذا القسم فقط. وإلا سيتلقى العميل من جميع الأقسام.

owner_level
int

الأولوية للمستهلك الحصري. سيتم إنشاء مستهلك حصري إذا تم تعيين owner_level. المستهلك ذو owner_level أعلى له أولوية حصرية أعلى. يعرف مستوى المالك أيضا باسم "قيمة الفترة" للمستهلك.

prefetch
int

عدد الأحداث التي يجب إحضارها مسبقا من الخدمة للمعالجة. الافتراضي هو 300.

track_last_enqueued_event_properties
bool

يشير إلى ما إذا كان يجب على المستهلك طلب معلومات حول الحدث الأخير في قائمة الانتظار على القسم المرتبط به، وتتبع هذه المعلومات أثناء تلقي الأحداث. عند تعقب معلومات حول آخر حدث مدرج في قائمة الانتظار للأقسام، سيحمل كل حدث تم تلقيه من خدمة مراكز الأحداث بيانات تعريف حول القسم. يؤدي هذا إلى كمية صغيرة من استهلاك النطاق الترددي الإضافي للشبكة الذي يكون عموما مفاضلة مواتية عند النظر في مقابل تقديم طلبات لخصائص القسم بشكل دوري باستخدام عميل Event Hub. يتم تعيينه إلى False بشكل افتراضي.

starting_position
str, int, datetime أو dict[str,any]

ابدأ في الاستلام من موضع الحدث هذا إذا لم تكن هناك بيانات نقطة تحقق لقسم. سيتم استخدام بيانات نقطة التحقق إذا كانت متوفرة. يمكن أن يكون هذا إملاء مع معرف القسم كمفتاح وموضع كقيمة للأقسام الفردية، أو قيمة واحدة لجميع الأقسام. يمكن أن يكون نوع القيمة str أو int أو datetime.datetime. يتم دعم القيم "-1" للاستلام من بداية الدفق، و"@latest" لتلقي الأحداث الجديدة فقط.

starting_position_inclusive
bool أو dict[str,bool]

تحديد ما إذا كان starting_position المحدد شاملا (>=) أم لا (>). صواب للشمولية والخطأ للحصري. يمكن أن يكون هذا إملاء بمعرف القسم كمفتاح وقيمة كقيمة تشير إلى ما إذا كان starting_position لقسم معين شاملا أم لا. يمكن أن تكون هذه أيضا قيمة منطقية واحدة لجميع starting_position. القيمة الافتراضية هي False.

on_error
Callable[[PartitionContext, Exception]]

دالة رد الاتصال التي سيتم استدعاؤها عند ظهور خطأ أثناء الاستلام بعد استنفاد محاولات إعادة المحاولة، أو أثناء عملية موازنة التحميل. يأخذ رد الاتصال معلمتين: partition_context التي تحتوي على معلومات القسم والخطأ هو الاستثناء. partition_context يمكن أن يكون None إذا تم رفع الخطأ أثناء عملية موازنة التحميل. يجب تعريف رد الاتصال مثل: on_error (partition_context، خطأ). سيتم استدعاء رد اتصال on_error أيضا إذا تم رفع استثناء غير معالج أثناء رد اتصال on_event .

on_partition_initialize
Callable[[PartitionContext]]

تنتهي دالة رد الاتصال التي سيتم استدعاؤها بعد انتهاء المستهلك لقسم معين من التهيئة. سيتم استدعاؤه أيضا عند إنشاء مستهلك قسم داخلي جديد لتولي عملية الاستلام لمستهلك قسم داخلي فاشل ومغلق. يأخذ رد الاتصال معلمة واحدة: partition_context التي تحتوي على معلومات القسم. يجب تعريف رد الاتصال مثل: on_partition_initialize (partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

يتم إغلاق دالة رد الاتصال التي سيتم استدعاؤها بعد إغلاق المستهلك لقسم معين. سيتم استدعاؤه أيضا عند ظهور خطأ أثناء الاستلام بعد استنفاد محاولات إعادة المحاولة. يأخذ رد الاتصال معلمتين: partition_context التي تحتوي على معلومات القسم وسبب الإغلاق. يجب تعريف رد الاتصال مثل: on_partition_close (partition_context، السبب). يرجى الرجوع إلى CloseReason لأسباب الإغلاق المختلفة.

نوع الإرجاع

أمثلة

تلقي الأحداث على دفعات من EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )