EventHubConsumerClient الفصل
تحدد فئة EventHubConsumerClient واجهة عالية المستوى لتلقي الأحداث من خدمة Azure Event Hubs.
الهدف الرئيسي من EventHubConsumerClient هو تلقي الأحداث من جميع أقسام EventHub مع موازنة التحميل ونقاط التفتيش.
عند تشغيل مثيلات EventHubConsumerClient متعددة مقابل نفس مركز الحدث ومجموعة المستهلكين وموقع نقاط التحقق، سيتم توزيع الأقسام بالتساوي بينها.
لتمكين موازنة التحميل ونقاط التحقق المستمرة، يجب تعيين checkpoint_store عند إنشاء EventHubConsumerClient. إذا لم يتم توفير مخزن نقطة تحقق، فسيتم الاحتفاظ بنقطة التحقق داخليا في الذاكرة.
يمكن أن يتلقى EventHubConsumerClient أيضا من قسم معين عند استدعاء أسلوبه receive() أو receive_batch() وتحديد partition_id. لن تعمل موازنة التحميل في وضع القسم الفردي. ولكن لا يزال بإمكان المستخدمين حفظ نقاط التحقق إذا تم تعيين checkpoint_store.
- توريث
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubConsumerClient
الدالمنشئ
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
المعلمات
- fully_qualified_namespace
- str
اسم المضيف المؤهل بالكامل لمساحة اسم مراكز الأحداث. تنسيق مساحة الاسم هو: .servicebus.windows.net.
- credential
- AsyncTokenCredential أو AzureSasCredential أو AzureNamedKeyCredential
عنصر بيانات الاعتماد المستخدم للمصادقة الذي ينفذ واجهة معينة للحصول على الرموز المميزة. يقبل EventHubSharedKeyCredentialكائنات بيانات الاعتماد أو التي تم إنشاؤها بواسطة مكتبة هوية azure والعناصر التي تنفذ أسلوب *get_token(self, scopes).
- logging_enable
- bool
ما إذا كان يجب إخراج سجلات تتبع الشبكة إلى المسجل. الافتراضي هو False.
- auth_timeout
- float
الوقت بالثوان للانتظار حتى يتم تخويل رمز مميز من قبل الخدمة. القيمة الافتراضية هي 60 ثانية. إذا تم تعيينه إلى 0، فلن يتم فرض أي مهلة من العميل.
- user_agent
- str
إذا تم تحديده، فستتم إضافة هذا أمام سلسلة عامل المستخدم.
- retry_total
- int
العدد الإجمالي لمحاولات إعادة عملية فاشلة عند حدوث خطأ. القيمة الافتراضية هي: 3. سياق retry_total في الاستلام خاص: يتم تنفيذ أسلوب الاستلام بواسطة أسلوب تلقي داخلي للاتصال أثناء التكرار الحلقي في كل تكرار. في حالة الاستلام ، يحدد retry_total أرقام إعادة المحاولة بعد الخطأ الذي تم رفعه بواسطة أسلوب الاستلام الداخلي في التكرار الحلقي أثناء. إذا تم استنفاد محاولات إعادة المحاولة، فسيتم استدعاء رد الاتصال on_error (إذا تم توفيره) بمعلومات الخطأ. سيتم إغلاق مستهلك القسم الداخلي الفاشل (سيتم استدعاء on_partition_close إذا تم توفيره) وسيتم إنشاء مستهلك قسم داخلي جديد (on_partition_initialize سيتم استدعاؤه إذا تم توفيره) لاستئناف الاستلام.
- retry_backoff_factor
- float
عامل تراجع لتطبيقه بين المحاولات بعد المحاولة الثانية (يتم حل معظم الأخطاء على الفور عن طريق محاولة ثانية دون تأخير). في الوضع الثابت، سينام نهج إعادة المحاولة دائما ل {عامل التراجع}. في الوضع "الأسي"، سينام نهج إعادة المحاولة ل: {عامل التراجع} * (2 ** ({عدد إجمالي عمليات إعادة المحاولة} - 1)) ثانية. إذا كان backoff_factor هو 0.1، فستنام إعادة المحاولة ل [0.0s، 0.2s، 0.4s، ...] بين عمليات إعادة المحاولة. القيمة الافتراضية هي 0.8.
- retry_backoff_max
- float
الحد الأقصى لوقت التراجع. القيمة الافتراضية هي 120 ثانية (دقيقتان).
- retry_mode
- str
سلوك التأخير بين محاولات إعادة المحاولة. القيم المدعومة "ثابتة" أو "أسية"، حيث يكون الافتراضي "أسيا".
- idle_timeout
- float
المهلة، بالثوان، وبعد ذلك سيقوم هذا العميل بإغلاق الاتصال الأساسي إذا لم يكن هناك أي نشاط آخر. بشكل افتراضي، القيمة هي None، ما يعني أن العميل لن يتم إيقاف تشغيله بسبب عدم النشاط ما لم تبدأه الخدمة.
- transport_type
- TransportType
نوع بروتوكول النقل الذي سيتم استخدامه للاتصال بخدمة مراكز الأحداث. الافتراضي هو TransportType.Amqp في هذه الحالة يتم استخدام المنفذ 5671. إذا كان المنفذ 5671 غير متوفر/محظور في بيئة الشبكة، يمكن استخدام TransportType.AmqpOverWebsocket بدلا من ذلك الذي يستخدم المنفذ 443 للاتصال.
- http_proxy
إعدادات وكيل HTTP. يجب أن يكون هذا قاموسا بالمفاتيح التالية: "proxy_hostname" (قيمة str) و "proxy_port" (قيمة int).
- checkpoint_store
- Optional[CheckpointStore]
مدير يخزن بيانات موازنة تحميل القسم ونقطة التحقق عند تلقي الأحداث. سيتم استخدام مخزن نقاط التحقق في كلتا الحالتين من الاستلام من جميع الأقسام أو قسم واحد. في الحالة الأخيرة، لا يتم تطبيق موازنة التحميل. إذا لم يتم توفير مخزن نقاط التحقق، فسيتم الاحتفاظ بنقطة التحقق داخليا في الذاكرة، وسيتلقى مثيل EventHubConsumerClient الأحداث دون موازنة التحميل.
- load_balancing_interval
- float
عند بدء موازنة التحميل. هذا هو الفاصل الزمني، بالثوان، بين تقييمين لموازنة التحميل. الافتراضي هو 30 ثانية.
- partition_ownership_expiration_interval
- float
ستنتهي صلاحية ملكية القسم بعد هذا العدد من الثوان. سيؤدي كل تقييم لموازنة التحميل تلقائيا إلى تمديد وقت انتهاء صلاحية الملكية. الافتراضي هو 6 * load_balancing_interval، أي 180 ثانية عند استخدام load_balancing_interval الافتراضية البالغة 30 ثانية.
- load_balancing_strategy
- str أو LoadBalancingStrategy
عند بدء موازنة التحميل، سيستخدم هذه الاستراتيجية للمطالبة بملكية القسم وموازنة ذلك. استخدم "الجشع" أو LoadBalancingStrategy.GREEDY للاستراتيجية الجشعة، والتي، لكل تقييم لموازنة التحميل، ستستحوذ على العديد من الأقسام غير المطالب بها المطلوبة لموازنة الحمل. استخدم "متوازن" أو LoadBalancingStrategy.BALANCED للاستراتيجية المتوازنة، والتي، لكل تقييم لموازنة التحميل، تطالب بقسم واحد فقط لا يطالب به EventHubConsumerClient آخر. إذا تم المطالبة بجميع أقسام EventHub بواسطة EventHubConsumerClient آخر وكان هذا العميل قد طالب بعدد قليل جدا من الأقسام، فسيسرق هذا العميل قسما واحدا من عملاء آخرين لكل تقييم موازنة تحميل بغض النظر عن استراتيجية موازنة التحميل. يتم استخدام استراتيجية الجشع بشكل افتراضي.
عنوان نقطة النهاية المخصص لاستخدامه لإنشاء اتصال بخدمة مراكز الأحداث، ما يسمح بتوجيه طلبات الشبكة من خلال أي بوابات تطبيق أو مسارات أخرى مطلوبة لبيئة المضيف. الافتراضي هو بلا. سيكون التنسيق مثل "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". إذا لم يتم تحديد المنفذ في custom_endpoint_address، فسيتم استخدام المنفذ 443 بشكل افتراضي.
المسار إلى ملف CA_BUNDLE المخصص لشهادة SSL المستخدمة لمصادقة هوية نقطة نهاية الاتصال. الافتراضي هو None وفي هذه الحالة سيتم استخدام certifi.where() .
- uamqp_transport
- bool
ما إذا كنت تريد استخدام مكتبة uamqp كنقل أساسي. القيمة الافتراضية هي False وسيتم استخدام مكتبة Pure Python AMQP كنقل أساسي.
- socket_timeout
- float
الوقت بالثوان الذي يجب أن ينتظره مأخذ التوصيل الأساسي على الاتصال عند إرسال البيانات وتلقيها قبل انتهاء المهلة. القيمة الافتراضية هي 0.2 ل TransportType.Amqp و1 ل TransportType.AmqpOverWebsocket. إذا حدثت أخطاء EventHubsConnectionError بسبب انتهاء مهلة الكتابة، فقد تحتاج قيمة أكبر من القيمة الافتراضية إلى تمريرها. هذا لسيناريوهات الاستخدام المتقدمة وعادة ما يجب أن تكون القيمة الافتراضية كافية.
أمثلة
إنشاء مثيل جديد من EventHubConsumerClient.
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
consumer_group='$Default',
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
الأساليب
close |
توقف عن استرداد الأحداث من Event Hub وأغلق اتصال AMQP الأساسي والارتباطات. |
from_connection_string |
إنشاء EventHubConsumerClient من سلسلة الاتصال. |
get_eventhub_properties |
احصل على خصائص Event Hub. تتضمن المفاتيح الموجودة في القاموس الذي تم إرجاعه ما يلي:
|
get_partition_ids |
احصل على معرفات القسم لمركز الأحداث. |
get_partition_properties |
احصل على خصائص القسم المحدد. تتضمن المفاتيح الموجودة في قاموس الخصائص ما يلي:
|
receive |
تلقي الأحداث من القسم (الأقسام)، مع موازنة التحميل الاختيارية ونقاط التفتيش. |
receive_batch |
تلقي الأحداث من الأقسام (الأقسام) على دفعات، مع موازنة التحميل الاختيارية ونقاط التحقق. |
close
توقف عن استرداد الأحداث من Event Hub وأغلق اتصال AMQP الأساسي والارتباطات.
async close() -> None
نوع الإرجاع
أمثلة
أغلق العميل.
import os
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
# Close down the consumer handler explicitly.
await consumer.close()
from_connection_string
إنشاء EventHubConsumerClient من سلسلة الاتصال.
from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient
المعلمات
- eventhub_name
- str
مسار Event Hub المحدد لتوصيل العميل به.
- logging_enable
- bool
ما إذا كان يجب إخراج سجلات تتبع الشبكة إلى المسجل. الافتراضي هو False.
- http_proxy
- dict
إعدادات وكيل HTTP. يجب أن يكون هذا قاموسا بالمفاتيح التالية: "proxy_hostname" (قيمة str) و "proxy_port" (قيمة int). بالإضافة إلى ذلك، قد تكون المفاتيح التالية موجودة أيضا: "اسم المستخدم"، "كلمة المرور".
- auth_timeout
- float
الوقت بالثوان للانتظار حتى يتم تخويل رمز مميز من قبل الخدمة. القيمة الافتراضية هي 60 ثانية. إذا تم تعيينه إلى 0، فلن يتم فرض أي مهلة من العميل.
- user_agent
- str
إذا تم تحديده، فستتم إضافة هذا أمام سلسلة عامل المستخدم.
- retry_total
- int
العدد الإجمالي لمحاولات إعادة عملية فاشلة عند حدوث خطأ. القيمة الافتراضية هي: 3. سياق retry_total في الاستلام خاص: يتم تنفيذ أسلوب الاستلام بواسطة أسلوب تلقي داخلي للاتصال أثناء التكرار الحلقي في كل تكرار. في حالة الاستلام ، يحدد retry_total أرقام إعادة المحاولة بعد الخطأ الذي تم رفعه بواسطة أسلوب الاستلام الداخلي في التكرار الحلقي أثناء. إذا تم استنفاد محاولات إعادة المحاولة، فسيتم استدعاء رد الاتصال on_error (إذا تم توفيره) بمعلومات الخطأ. سيتم إغلاق مستهلك القسم الداخلي الفاشل (سيتم استدعاء on_partition_close إذا تم توفيره) وسيتم إنشاء مستهلك قسم داخلي جديد (on_partition_initialize سيتم استدعاؤه إذا تم توفيره) لاستئناف الاستلام.
- retry_backoff_factor
- float
عامل تراجع لتطبيقه بين المحاولات بعد المحاولة الثانية (يتم حل معظم الأخطاء على الفور عن طريق محاولة ثانية دون تأخير). في الوضع الثابت، سينام نهج إعادة المحاولة دائما ل {عامل التراجع}. في الوضع "الأسي"، سينام نهج إعادة المحاولة ل: {عامل التراجع} * (2 ** ({عدد إجمالي عمليات إعادة المحاولة} - 1)) ثانية. إذا كان backoff_factor هو 0.1، فستنام إعادة المحاولة ل [0.0s، 0.2s، 0.4s، ...] بين عمليات إعادة المحاولة. القيمة الافتراضية هي 0.8.
- retry_backoff_max
- float
الحد الأقصى لوقت التراجع. القيمة الافتراضية هي 120 ثانية (دقيقتان).
- retry_mode
- str
سلوك التأخير بين محاولات إعادة المحاولة. القيم المدعومة "ثابتة" أو "أسية"، حيث يكون الافتراضي "أسيا".
- idle_timeout
- float
المهلة، بالثوان، وبعد ذلك سيقوم هذا العميل بإغلاق الاتصال الأساسي إذا لم يكن هناك أي نشاط آخر. بشكل افتراضي، القيمة هي None، ما يعني أن العميل لن يتم إيقاف تشغيله بسبب عدم النشاط ما لم تبدأه الخدمة.
- transport_type
- TransportType
نوع بروتوكول النقل الذي سيتم استخدامه للاتصال بخدمة مراكز الأحداث. الافتراضي هو TransportType.Amqp في هذه الحالة يتم استخدام المنفذ 5671. إذا كان المنفذ 5671 غير متوفر/محظور في بيئة الشبكة، يمكن استخدام TransportType.AmqpOverWebsocket بدلا من ذلك الذي يستخدم المنفذ 443 للاتصال.
- checkpoint_store
- Optional[CheckpointStore]
مدير يخزن بيانات موازنة تحميل القسم ونقطة التحقق عند تلقي الأحداث. سيتم استخدام مخزن نقاط التحقق في كلتا الحالتين من الاستلام من جميع الأقسام أو قسم واحد. في الحالة الأخيرة، لا يتم تطبيق موازنة التحميل. إذا لم يتم توفير مخزن نقاط التحقق، فسيتم الاحتفاظ بنقطة التحقق داخليا في الذاكرة، وسيتلقى مثيل EventHubConsumerClient الأحداث دون موازنة التحميل.
- load_balancing_interval
- float
عند بدء موازنة التحميل. هذا هو الفاصل الزمني، بالثوان، بين تقييمين لموازنة التحميل. الافتراضي هو 30 ثانية.
- partition_ownership_expiration_interval
- float
ستنتهي صلاحية ملكية القسم بعد هذا العدد من الثوان. سيؤدي كل تقييم لموازنة التحميل تلقائيا إلى تمديد وقت انتهاء صلاحية الملكية. الافتراضي هو 6 * load_balancing_interval، أي 180 ثانية عند استخدام load_balancing_interval الافتراضية البالغة 30 ثانية.
- load_balancing_strategy
- str أو LoadBalancingStrategy
عند بدء موازنة التحميل، سيستخدم هذه الاستراتيجية للمطالبة بملكية القسم وموازنة ذلك. استخدم "الجشع" أو LoadBalancingStrategy.GREEDY للاستراتيجية الجشعة، والتي، لكل تقييم لموازنة التحميل، ستستحوذ على العديد من الأقسام غير المطالب بها المطلوبة لموازنة الحمل. استخدم "متوازن" أو LoadBalancingStrategy.BALANCED للاستراتيجية المتوازنة، والتي، لكل تقييم لموازنة التحميل، تطالب بقسم واحد فقط لا يطالب به EventHubConsumerClient آخر. إذا تم المطالبة بجميع أقسام EventHub بواسطة EventHubConsumerClient آخر وكان هذا العميل قد طالب بعدد قليل جدا من الأقسام، فسيسرق هذا العميل قسما واحدا من عملاء آخرين لكل تقييم موازنة تحميل بغض النظر عن استراتيجية موازنة التحميل. يتم استخدام استراتيجية الجشع بشكل افتراضي.
عنوان نقطة النهاية المخصص لاستخدامه لإنشاء اتصال بخدمة مراكز الأحداث، ما يسمح بتوجيه طلبات الشبكة من خلال أي بوابات تطبيق أو مسارات أخرى مطلوبة لبيئة المضيف. الافتراضي هو بلا. سيكون التنسيق مثل "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". إذا لم يتم تحديد المنفذ في custom_endpoint_address، فسيتم استخدام المنفذ 443 بشكل افتراضي.
المسار إلى ملف CA_BUNDLE المخصص لشهادة SSL المستخدمة لمصادقة هوية نقطة نهاية الاتصال. الافتراضي هو None وفي هذه الحالة سيتم استخدام certifi.where() .
- uamqp_transport
- bool
ما إذا كنت تريد استخدام مكتبة uamqp كنقل أساسي. القيمة الافتراضية هي False وسيتم استخدام مكتبة Pure Python AMQP كنقل أساسي.
نوع الإرجاع
أمثلة
إنشاء مثيل جديد من EventHubConsumerClient من سلسلة الاتصال.
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
احصل على خصائص Event Hub.
تتضمن المفاتيح الموجودة في القاموس الذي تم إرجاعه ما يلي:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
المرتجعات
قاموس يحتوي على معلومات حول Event Hub.
نوع الإرجاع
استثناءات
get_partition_ids
احصل على معرفات القسم لمركز الأحداث.
async get_partition_ids() -> List[str]
المرتجعات
قائمة بمعرفات الأقسام.
نوع الإرجاع
استثناءات
get_partition_properties
احصل على خصائص القسم المحدد.
تتضمن المفاتيح الموجودة في قاموس الخصائص ما يلي:
eventhub_name (str)
المعرف (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
المعلمات
المرتجعات
قاموس يحتوي على خصائص القسم.
نوع الإرجاع
استثناءات
receive
تلقي الأحداث من القسم (الأقسام)، مع موازنة التحميل الاختيارية ونقاط التفتيش.
async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
المعلمات
- on_event
- Callable[PartitionContext, Optional[EventData]]
دالة رد الاتصال لمعالجة حدث تم تلقيه. يأخذ رد الاتصال معلمتين: partition_context الذي يحتوي على سياق القسم والحدث الذي هو الحدث المستلم. يجب تعريف دالة رد الاتصال مثل: on_event (partition_context، الحدث). للحصول على معلومات مفصلة عن سياق القسم، يرجى الرجوع إلى PartitionContext.
- max_wait_time
- float
الحد الأقصى للفاصل الزمني بالثوان الذي سينتظره معالج الحدث قبل استدعاء رد الاتصال. إذا لم يتم تلقي أي أحداث خلال هذا الفاصل الزمني، فسيتم استدعاء رد اتصال on_event مع بلا. إذا تم تعيين هذه القيمة إلى بلا أو 0 (الافتراضي)، فلن يتم استدعاء رد الاتصال حتى يتم تلقي حدث.
- partition_id
- str
إذا تم تحديده، فسيتلقى العميل من هذا القسم فقط. وإلا سيتلقى العميل من جميع الأقسام.
- owner_level
- int
الأولوية للمستهلك الحصري. سيتم إنشاء مستهلك حصري إذا تم تعيين owner_level. المستهلك ذو owner_level أعلى له أولوية حصرية أعلى. يعرف مستوى المالك أيضا باسم "قيمة الفترة" للمستهلك.
- prefetch
- int
عدد الأحداث التي يجب إحضارها مسبقا من الخدمة للمعالجة. الافتراضي هو 300.
- track_last_enqueued_event_properties
- bool
يشير إلى ما إذا كان يجب على المستهلك طلب معلومات حول الحدث الأخير في قائمة الانتظار على القسم المرتبط به، وتتبع هذه المعلومات أثناء تلقي الأحداث. عند تعقب معلومات حول آخر حدث مدرج في قائمة الانتظار للأقسام، سيحمل كل حدث تم تلقيه من خدمة مراكز الأحداث بيانات تعريف حول القسم. يؤدي هذا إلى كمية صغيرة من استهلاك النطاق الترددي الإضافي للشبكة الذي يكون عموما مفاضلة مواتية عند النظر في مقابل تقديم طلبات لخصائص القسم بشكل دوري باستخدام عميل Event Hub. يتم تعيينه إلى False بشكل افتراضي.
ابدأ في الاستلام من موضع الحدث هذا إذا لم تكن هناك بيانات نقطة تحقق لقسم. سيتم استخدام بيانات نقطة التحقق إذا كانت متوفرة. يمكن أن يكون هذا إملاء مع معرف القسم كمفتاح وموضع كقيمة للأقسام الفردية، أو قيمة واحدة لجميع الأقسام. يمكن أن يكون نوع القيمة str أو int أو datetime.datetime. يتم دعم القيم "-1" للاستلام من بداية الدفق، و"@latest" لتلقي الأحداث الجديدة فقط.
تحديد ما إذا كان starting_position المحدد شاملا (>=) أم لا (>). صواب للشمولية والخطأ للحصري. يمكن أن يكون هذا إملاء بمعرف القسم كمفتاح وقيمة كقيمة تشير إلى ما إذا كان starting_position لقسم معين شاملا أم لا. يمكن أن تكون هذه أيضا قيمة منطقية واحدة لجميع starting_position. القيمة الافتراضية هي False.
- on_error
- Callable[[PartitionContext, Exception]]
دالة رد الاتصال التي سيتم استدعاؤها عند ظهور خطأ أثناء الاستلام بعد استنفاد محاولات إعادة المحاولة، أو أثناء عملية موازنة التحميل. يأخذ رد الاتصال معلمتين: partition_context التي تحتوي على معلومات القسم والخطأ هو الاستثناء. partition_context يمكن أن يكون None إذا تم رفع الخطأ أثناء عملية موازنة التحميل. يجب تعريف رد الاتصال مثل: on_error (partition_context، خطأ). سيتم استدعاء رد اتصال on_error أيضا إذا تم رفع استثناء غير معالج أثناء رد اتصال on_event .
- on_partition_initialize
- Callable[[PartitionContext]]
تنتهي دالة رد الاتصال التي سيتم استدعاؤها بعد انتهاء المستهلك لقسم معين من التهيئة. سيتم استدعاؤه أيضا عند إنشاء مستهلك قسم داخلي جديد لتولي عملية الاستلام لمستهلك قسم داخلي فاشل ومغلق. يأخذ رد الاتصال معلمة واحدة: partition_context التي تحتوي على معلومات القسم. يجب تعريف رد الاتصال مثل: on_partition_initialize (partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
يتم إغلاق دالة رد الاتصال التي سيتم استدعاؤها بعد إغلاق المستهلك لقسم معين. سيتم استدعاؤه أيضا عند ظهور خطأ أثناء الاستلام بعد استنفاد محاولات إعادة المحاولة. يأخذ رد الاتصال معلمتين: partition_context التي تحتوي على معلومات القسم وسبب الإغلاق. يجب تعريف رد الاتصال مثل: on_partition_close (partition_context، السبب). يرجى الرجوع إلى CloseReason لأسباب الإغلاق المختلفة.
نوع الإرجاع
أمثلة
تلقي الأحداث من EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
async with consumer:
await consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
receive_batch
تلقي الأحداث من الأقسام (الأقسام) على دفعات، مع موازنة التحميل الاختيارية ونقاط التحقق.
async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
المعلمات
- on_event_batch
- Callable[PartitionContext, List[EventData]]
دالة رد الاتصال لمعالجة دفعة من الأحداث المستلمة. يأخذ رد الاتصال معلمتين: partition_context الذي يحتوي على سياق القسم event_batch، وهو الأحداث المستلمة. يجب تعريف دالة رد الاتصال مثل: on_event_batch (partition_context، event_batch). قد تكون event_batch قائمة فارغة إذا لم يكن max_wait_time بلا ولا 0 ولم يتم تلقي أي حدث بعد max_wait_time. للحصول على معلومات مفصلة عن سياق القسم، يرجى الرجوع إلى PartitionContext.
- max_batch_size
- int
الحد الأقصى لعدد الأحداث في دفعة تم تمريرها إلى on_event_batch رد الاتصال. إذا كان العدد الفعلي للأحداث المستلمة أكبر من max_batch_size، يتم تقسيم الأحداث المستلمة إلى دفعات واستدعاء رد الاتصال لكل دفعة مع ما يصل إلى max_batch_size الأحداث.
- max_wait_time
- float
الحد الأقصى للفاصل الزمني بالثوان الذي سينتظره معالج الحدث قبل استدعاء رد الاتصال. إذا لم يتم تلقي أي أحداث خلال هذا الفاصل الزمني، فسيتم استدعاء رد اتصال on_event_batch بقائمة فارغة. إذا تم تعيين هذه القيمة إلى بلا أو 0 (الافتراضي)، فلن يتم استدعاء رد الاتصال حتى يتم تلقي الأحداث.
- partition_id
- str
إذا تم تحديده، فسيتلقى العميل من هذا القسم فقط. وإلا سيتلقى العميل من جميع الأقسام.
- owner_level
- int
الأولوية للمستهلك الحصري. سيتم إنشاء مستهلك حصري إذا تم تعيين owner_level. المستهلك ذو owner_level أعلى له أولوية حصرية أعلى. يعرف مستوى المالك أيضا باسم "قيمة الفترة" للمستهلك.
- prefetch
- int
عدد الأحداث التي يجب إحضارها مسبقا من الخدمة للمعالجة. الافتراضي هو 300.
- track_last_enqueued_event_properties
- bool
يشير إلى ما إذا كان يجب على المستهلك طلب معلومات حول الحدث الأخير في قائمة الانتظار على القسم المرتبط به، وتتبع هذه المعلومات أثناء تلقي الأحداث. عند تعقب معلومات حول آخر حدث مدرج في قائمة الانتظار للأقسام، سيحمل كل حدث تم تلقيه من خدمة مراكز الأحداث بيانات تعريف حول القسم. يؤدي هذا إلى كمية صغيرة من استهلاك النطاق الترددي الإضافي للشبكة الذي يكون عموما مفاضلة مواتية عند النظر في مقابل تقديم طلبات لخصائص القسم بشكل دوري باستخدام عميل Event Hub. يتم تعيينه إلى False بشكل افتراضي.
ابدأ في الاستلام من موضع الحدث هذا إذا لم تكن هناك بيانات نقطة تحقق لقسم. سيتم استخدام بيانات نقطة التحقق إذا كانت متوفرة. يمكن أن يكون هذا إملاء مع معرف القسم كمفتاح وموضع كقيمة للأقسام الفردية، أو قيمة واحدة لجميع الأقسام. يمكن أن يكون نوع القيمة str أو int أو datetime.datetime. يتم دعم القيم "-1" للاستلام من بداية الدفق، و"@latest" لتلقي الأحداث الجديدة فقط.
تحديد ما إذا كان starting_position المحدد شاملا (>=) أم لا (>). صواب للشمولية والخطأ للحصري. يمكن أن يكون هذا إملاء بمعرف القسم كمفتاح وقيمة كقيمة تشير إلى ما إذا كان starting_position لقسم معين شاملا أم لا. يمكن أن تكون هذه أيضا قيمة منطقية واحدة لجميع starting_position. القيمة الافتراضية هي False.
- on_error
- Callable[[PartitionContext, Exception]]
دالة رد الاتصال التي سيتم استدعاؤها عند ظهور خطأ أثناء الاستلام بعد استنفاد محاولات إعادة المحاولة، أو أثناء عملية موازنة التحميل. يأخذ رد الاتصال معلمتين: partition_context التي تحتوي على معلومات القسم والخطأ هو الاستثناء. partition_context يمكن أن يكون None إذا تم رفع الخطأ أثناء عملية موازنة التحميل. يجب تعريف رد الاتصال مثل: on_error (partition_context، خطأ). سيتم استدعاء رد اتصال on_error أيضا إذا تم رفع استثناء غير معالج أثناء رد اتصال on_event .
- on_partition_initialize
- Callable[[PartitionContext]]
تنتهي دالة رد الاتصال التي سيتم استدعاؤها بعد انتهاء المستهلك لقسم معين من التهيئة. سيتم استدعاؤه أيضا عند إنشاء مستهلك قسم داخلي جديد لتولي عملية الاستلام لمستهلك قسم داخلي فاشل ومغلق. يأخذ رد الاتصال معلمة واحدة: partition_context التي تحتوي على معلومات القسم. يجب تعريف رد الاتصال مثل: on_partition_initialize (partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
يتم إغلاق دالة رد الاتصال التي سيتم استدعاؤها بعد إغلاق المستهلك لقسم معين. سيتم استدعاؤه أيضا عند ظهور خطأ أثناء الاستلام بعد استنفاد محاولات إعادة المحاولة. يأخذ رد الاتصال معلمتين: partition_context التي تحتوي على معلومات القسم وسبب الإغلاق. يجب تعريف رد الاتصال مثل: on_partition_close (partition_context، السبب). يرجى الرجوع إلى CloseReason لأسباب الإغلاق المختلفة.
نوع الإرجاع
أمثلة
تلقي الأحداث على دفعات من EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info(
"{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
Azure SDK for Python
الملاحظات
https://aka.ms/ContentUserFeedback.
قريبًا: خلال عام 2024، سنتخلص تدريجيًا من GitHub Issues بوصفها آلية إرسال ملاحظات للمحتوى ونستبدلها بنظام ملاحظات جديد. لمزيد من المعلومات، راجعإرسال الملاحظات وعرضها المتعلقة بـ