Поделиться через


EventHubConsumerClient Класс

Класс EventHubConsumerClient определяет высокоуровневый интерфейс для получения событий из службы Центры событий Azure.

Целью main EventHubConsumerClient является получение событий из всех секций EventHub с помощью балансировки нагрузки и создания контрольных точек.

Если несколько экземпляров EventHubConsumerClient выполняются в одном концентраторе событий, группе потребителей и расположении контрольных точек, секции будут равномерно распределены между ними.

Чтобы включить балансировку нагрузки и сохраняемые контрольные точки, необходимо задать checkpoint_store при создании EventHubConsumerClient. Если хранилище контрольных точек не предоставлено, контрольная точка будет храниться внутри памяти.

EventHubConsumerClient также может получать из определенной секции при вызове метода receive() или receive_batch() и указании partition_id. Балансировка нагрузки не будет работать в односекционном режиме. Но пользователи по-прежнему могут сохранять контрольные точки, если задано checkpoint_store.

Наследование
azure.eventhub._client_base.ClientBase
EventHubConsumerClient

Конструктор

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Параметры

fully_qualified_namespace
str
Обязательно

Полное имя узла для пространства имен Центров событий. Формат пространства имен: .servicebus.windows.net.

eventhub_name
str
Обязательно

Путь к конкретному концентратору событий, к которому подключается клиент.

consumer_group
str
Обязательно

Получение событий из концентратора событий для этой группы потребителей.

credential
TokenCredential или AzureSasCredential или AzureNamedKeyCredential
Обязательно

Объект учетных данных, используемый для проверки подлинности, который реализует определенный интерфейс для получения маркеров. Он принимает объекты учетных EventHubSharedKeyCredentialданных или , созданные библиотекой azure-identity, и объекты, реализующие метод *get_token(self, scopes).

logging_enable
bool

Следует ли выводить журналы трассировки сети в средство ведения журнала. Значение по умолчанию — False.

auth_timeout
float

Время в секундах для ожидания маркера, который будет авторизован службой. Значение по умолчанию — 60 секунд. Если задано значение 0, время ожидания клиента не будет принудительно применяться.

user_agent
str

Если этот параметр указан, он будет добавлен перед строкой агента пользователя.

retry_total
int

Общее число попыток повторного выполнения операции, завершилось сбоем при возникновении ошибки. Значение по умолчанию — 3. Контекст retry_total в получении является особым: метод получения реализуется циклом while, вызывающим внутренний метод получения в каждой итерации. В случае полученияretry_total указывает количество повторных попыток после ошибки, вызванной внутренним методом получения в цикле while. Если повторные попытки исчерпаны, будет вызван обратный вызов on_error (если он указан) со сведениями об ошибке. Неудавшийся потребитель внутренней секции будет закрыт (on_partition_close будет вызван, если указан), и будет создан новый внутренний потребитель секции (on_partition_initialize будет вызван, если указан), чтобы возобновить получение.

retry_backoff_factor
float

Коэффициент задержки, применяемый между попытками после второй попытки (большинство ошибок устраняются немедленно второй попыткой без задержки). В фиксированном режиме политика повторных попыток всегда будет переходить в спящий режим для {backoff factor}. В экспоненциальном режиме политика повторных попыток будет переходить в спящий режим в течение: {коэффициент задержки} * (2 ** ({общее число повторных попыток} - 1)) секунд. Если backoff_factor равно 0,1, то повторная попытка будет в спящем режиме [0.0s, 0.2s, 0.4s, ...] между повторными попытками. Значение по умолчанию — 0,8.

retry_backoff_max
float

Максимальное время отката. Значение по умолчанию — 120 секунд (2 минуты).

retry_mode
str

Поведение задержки между повторными попытками. Поддерживаемые значения: fixed или exponential, где по умолчанию — exponential.

idle_timeout
float

Время ожидания в секундах, по истечении которого клиент закроет базовое подключение, если дальнейшие действия отсутствуют. По умолчанию значение равно None. Это означает, что клиент не будет завершать работу из-за бездействия, если только он не инициирован службой.

transport_type
TransportType

Тип транспортного протокола, который будет использоваться для взаимодействия со службой Центров событий. Значение по умолчанию — TransportType.Amqp , в котором используется порт 5671. Если порт 5671 недоступен или заблокирован в сетевой среде, можно использовать TransportType.AmqpOverWebsocket , который использует порт 443 для обмена данными.

http_proxy
dict[str, str или int]

Параметры прокси-сервера HTTP. Это должен быть словарь со следующими ключами: "proxy_hostname" (значение str) и "proxy_port" (значение int). Кроме того, могут присутствовать следующие ключи: "имя пользователя", "пароль".

checkpoint_store
CheckpointStore или None

Диспетчер, хранящий данные балансировки нагрузки секции и контрольных точек при получении событий. Хранилище контрольных точек будет использоваться в обоих случаях получения из всех секций или из одной секции. В последнем случае балансировка нагрузки не применяется. Если хранилище контрольных точек не предоставлено, контрольная точка будет поддерживаться внутри памяти, а экземпляр EventHubConsumerClient будет получать события без балансировки нагрузки.

load_balancing_interval
float

При выполнении балансировки нагрузки. Это интервал в секундах между двумя оценками балансировки нагрузки. Значение по умолчанию — 30 секунд.

partition_ownership_expiration_interval
float

Срок владения секцией истекает через это количество секунд. Каждая оценка балансировки нагрузки автоматически продлевает срок действия владения. Значение по умолчанию — 6 * load_balancing_interval, т. е. 180 секунд при использовании load_balancing_interval по умолчанию 30 секунд.

load_balancing_strategy
str или LoadBalancingStrategy

При выполнении балансировки нагрузки она будет использовать эту стратегию, чтобы запросить и сбалансировать владение секцией. Используйте "жадный" или LoadBalancingStrategy.GREEDY для жадной стратегии, которая при каждой оценке балансировки нагрузки будет захватывать столько невостребованных секций, необходимых для балансировки нагрузки. Используйте "balanced" или LoadBalancingStrategy.BALANCED для сбалансированной стратегии, которая для каждой оценки балансировки нагрузки утверждает только одну секцию, которая не является заявленной другим EventHubConsumerClient. Если все секции концентратора событий запрашиваются другим EventHubConsumerClient и этот клиент утверждал слишком мало секций, этот клиент будет похищать одну секцию у других клиентов для каждой оценки балансировки нагрузки, независимо от стратегии балансировки нагрузки. По умолчанию используется жадная стратегия.

custom_endpoint_address
str или None

Адрес пользовательской конечной точки, используемый для установления подключения к службе Центров событий, что позволяет маршрутизировать сетевые запросы через шлюзы приложений или другие пути, необходимые для среды узла. Значение по умолчанию — Нет. Формат будет выглядеть следующим образом: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Если порт не указан в custom_endpoint_address, по умолчанию будет использоваться порт 443.

connection_verify
str или None

Путь к пользовательскому CA_BUNDLE файлу SSL-сертификата, который используется для проверки подлинности удостоверения конечной точки подключения. Значение по умолчанию — None, в этом случае будет использоваться certifi.where().

uamqp_transport
bool

Следует ли использовать библиотеку uamqp в качестве базового транспорта. Значение по умолчанию — False, и в качестве базового транспорта будет использоваться библиотека AMQP Pure Python.

socket_timeout
float

Время в секундах, в течение времени ожидания базового сокета в подключении при отправке и получении данных до истечения времени ожидания. Значение по умолчанию — 0,2 для TransportType.Amqp и 1 для TransportType.AmqpOverWebsocket. Если возникают ошибки EventHubsConnectionError из-за истечения времени ожидания записи, может потребоваться передать значение, превышающее значение по умолчанию. Это предназначено для расширенных сценариев использования, и обычно значения по умолчанию должно быть достаточно.

Примеры

Создайте новый экземпляр EventHubConsumerClient.


   import os
   from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,
       consumer_group='$Default',
       credential=credential)

Методы

close

Остановите получение событий из концентратора событий и закройте базовое подключение AMQP и ссылки.

from_connection_string

Создайте EventHubConsumerClient из строка подключения.

get_eventhub_properties

Получение свойств концентратора событий.

Ключи в возвращенном словаре:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Получение идентификаторов секций концентратора событий.

get_partition_properties

Получение свойств указанной секции.

В словаре свойств содержатся следующие ключи:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Получение событий из секций с необязательной балансировкой нагрузки и контрольными точками.

receive_batch

Получение событий из секций с необязательной балансировкой нагрузки и контрольными точками.

close

Остановите получение событий из концентратора событий и закройте базовое подключение AMQP и ссылки.

close() -> None

Возвращаемый тип

Примеры

Закройте клиент.


   import os
   import threading

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group="$Default",
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, multi-thread will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The 'receive' method is a blocking call, it can be executed in a thread for
   # non-blocking behavior, and combined with the 'close' method.

   worker = threading.Thread(
       target=consumer.receive,
       kwargs={
           "on_event": on_event,
           "starting_position": "-1",  # "-1" is from the beginning of the partition.
       }
   )
   worker.start()
   time.sleep(10)  # Keep receiving for 10s then close.
   # Close down the consumer handler explicitly.
   consumer.close()

from_connection_string

Создайте EventHubConsumerClient из строка подключения.

from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient

Параметры

conn_str
str
Обязательно

Строка подключения концентратора событий.

consumer_group
str
Обязательно

Получение событий из концентратора событий для этой группы потребителей.

eventhub_name
str

Путь к конкретному концентратору событий, к которому подключается клиент.

logging_enable
bool

Следует ли выводить журналы трассировки сети в средство ведения журнала. Значение по умолчанию — False.

auth_timeout
float

Время в секундах для ожидания маркера, который будет авторизован службой. Значение по умолчанию — 60 секунд. Если задано значение 0, время ожидания клиента не будет принудительно применяться.

user_agent
str

Если этот параметр указан, он будет добавлен перед строкой агента пользователя.

retry_total
int

Общее число попыток повторного выполнения операции, завершилось сбоем при возникновении ошибки. Значение по умолчанию — 3. Контекст retry_total в получении является особым: метод получения реализуется циклом while, вызывающим внутренний метод получения в каждой итерации. В случае полученияretry_total указывает количество повторных попыток после ошибки, вызванной внутренним методом получения в цикле while. Если повторные попытки исчерпаны, будет вызван обратный вызов on_error (если он указан) со сведениями об ошибке. Неудавшийся потребитель внутренней секции будет закрыт (on_partition_close будет вызван, если указан), и будет создан новый внутренний потребитель секции (on_partition_initialize будет вызван, если указан), чтобы возобновить получение.

retry_backoff_factor
float

Коэффициент задержки, применяемый между попытками после второй попытки (большинство ошибок устраняются немедленно второй попыткой без задержки). В фиксированном режиме политика повторных попыток всегда будет переходить в спящий режим для {backoff factor}. В экспоненциальном режиме политика повторных попыток будет переходить в спящий режим в течение: {коэффициент задержки} * (2 ** ({общее число повторных попыток} - 1)) секунд. Если backoff_factor равно 0,1, то повторная попытка будет в спящем режиме [0,0s, 0,2s, 0,4s, ...] между повторными попытками. Значение по умолчанию — 0,8.

retry_backoff_max
float

Максимальное время отката. Значение по умолчанию — 120 секунд (2 минуты).

retry_mode
str

Поведение задержки между повторными попытками. Поддерживаемые значения: fixed или exponential, где значение по умолчанию — "экспоненциальная".

idle_timeout
float

Время ожидания в секундах, по истечении которого клиент закроет базовое подключение, если действие furthur отсутствует. По умолчанию значение равно None. Это означает, что клиент не будет завершать работу из-за бездействия, если не будет инициирована службой.

transport_type
TransportType

Тип транспортного протокола, который будет использоваться для взаимодействия со службой Центров событий. Значение по умолчанию — TransportType.Amqp , в котором используется порт 5671. Если порт 5671 недоступен или заблокирован в сетевой среде, можно использовать TransportType.AmqpOverWebsocket , который использует порт 443 для связи.

http_proxy
dict

Параметры прокси-сервера HTTP. Это должен быть словарь со следующими ключами: "proxy_hostname" (значение str) и "proxy_port" (значение int). Кроме того, могут присутствовать следующие ключи: "имя пользователя", "пароль".

checkpoint_store
CheckpointStore или None

Диспетчер, который хранит данные о балансировке нагрузки секции и контрольных точках при получении событий. Хранилище контрольных точек будет использоваться в обоих случаях получения из всех секций или из одной секции. В последнем случае балансировка нагрузки не применяется. Если хранилище контрольных точек не предоставлено, контрольная точка будет храниться внутри памяти, а экземпляр EventHubConsumerClient будет получать события без балансировки нагрузки.

load_balancing_interval
float

Когда балансировка нагрузки начинается. Это интервал (в секундах) между двумя оценками балансировки нагрузки. Значение по умолчанию — 10 секунд.

partition_ownership_expiration_interval
float

Срок действия владения секцией истекает через это количество секунд. Каждая оценка балансировки нагрузки автоматически продлевает срок действия владения. Значение по умолчанию — 6 * load_balancing_interval, т. е. 60 секунд при использовании load_balancing_interval по умолчанию 30 секунд.

load_balancing_strategy
str или LoadBalancingStrategy

Когда выполняется балансировка нагрузки, она будет использовать эту стратегию для утверждения и балансировки владения секцией. Используйте "жадный" или LoadBalancingStrategy.GREEDY для жадной стратегии, которая для каждой оценки балансировки нагрузки будет захватывать столько невостребованных секций, необходимых для балансировки нагрузки. Используйте "balanced" или LoadBalancingStrategy.BALANCED для сбалансированной стратегии, которая для каждой оценки балансировки нагрузки утверждает только одну секцию, которая не является заявленной другим EventHubConsumerClient. Если все секции EventHub запрашиваются другим EventHubConsumerClient и этот клиент утверждал слишком мало секций, этот клиент будет украсть одну секцию у других клиентов для каждой оценки балансировки нагрузки независимо от стратегии балансировки нагрузки. По умолчанию используется жадная стратегия.

custom_endpoint_address
str или None

Адрес пользовательской конечной точки, используемый для установления подключения к службе Центров событий, что позволяет маршрутизировать сетевые запросы через любые шлюзы приложений или другие пути, необходимые для среды узла. Значение по умолчанию — Нет. Формат будет следующим: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Если порт не указан в custom_endpoint_address, по умолчанию будет использоваться порт 443.

connection_verify
str или None

Путь к пользовательскому CA_BUNDLE файлу SSL-сертификата, который используется для проверки подлинности удостоверения конечной точки подключения. Значение по умолчанию — Нет, в этом случае будет использоваться certifi.where( ).

uamqp_transport
bool

Следует ли использовать библиотеку uamqp в качестве базового транспорта. Значение по умолчанию — False, а библиотека AMQP Pure Python будет использоваться в качестве базового транспорта.

Возвращаемый тип

Примеры

Создайте экземпляр EventHubConsumerClient из строка подключения.


   import os
   from azure.eventhub import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Получение свойств концентратора событий.

Ключи в возвращенном словаре:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

Возвращаемое значение

Словарь, содержащий сведения о концентраторе событий.

Возвращаемый тип

Исключения

get_partition_ids

Получение идентификаторов секций концентратора событий.

get_partition_ids() -> List[str]

Возвращаемое значение

Список идентификаторов секций.

Возвращаемый тип

Исключения

get_partition_properties

Получение свойств указанной секции.

В словаре свойств содержатся следующие ключи:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

Параметры

partition_id
str
Обязательно

Идентификатор целевой секции.

Возвращаемое значение

Словарь, содержащий свойства секции.

Возвращаемый тип

Исключения

receive

Получение событий из секций с необязательной балансировкой нагрузки и контрольными точками.

receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None

Параметры

on_event
callable[PartitionContext, EventData или None]
Обязательно

Функция обратного вызова для обработки полученного события. Обратный вызов принимает два параметра: partition_context который содержит контекст секции, и событие , являющееся полученным событием. Функция обратного вызова должна быть определена следующим образом: on_event(partition_context, событие). Подробные сведения о контексте секции см. в PartitionContextразделе .

max_wait_time
float

Максимальный интервал в секундах, который обработчик событий будет ожидать перед вызовом обратного вызова. Если события не получены в течение этого интервала, обратный вызов on_event будет вызываться с параметром None. Если для этого значения задано значение None или 0 (значение по умолчанию), обратный вызов не будет вызываться до получения события.

partition_id
str

Если этот параметр указан, клиент будет получать только из этой секции. В противном случае клиент будет получать данные из всех секций.

owner_level
int

Приоритет исключительного потребителя. Если задано owner_level, будет создан эксклюзивный потребитель. Потребитель с более высоким owner_level имеет более высокий эксклюзивный приоритет. Уровень владельца также известен как "значение эпохи" потребителя.

prefetch
int

Количество событий для предварительной выборки из службы для обработки. Значение по умолчанию — 300.

track_last_enqueued_event_properties
bool

Указывает, должен ли потребитель запрашивать сведения о последнем событии, помещенном в очередь, в связанной секции, и отслеживать эти сведения по мере получения событий. При отслеживании сведений о последнем событии секций, помещенных в очередь, каждое событие, полученное от службы Центров событий, будет содержать метаданные о секции. Это приводит к небольшому объему дополнительного потребления пропускной способности сети, что, как правило, является благоприятным компромиссом при рассмотрении проблемы с периодическим выполнением запросов свойств секции с помощью клиента концентратора событий. По умолчанию задано значение False .

starting_position
str, int, datetime или dict[str,any]

Начните получать данные из этой позиции события, если для секции нет данных контрольных точек. Данные контрольных точек будут использоваться, если они доступны. Это может быть диктовка с идентификатором секции в качестве ключа и положением в качестве значения для отдельных секций или одно значение для всех секций. Тип значения может быть str, int или datetime.datetime. Также поддерживаются значения "-1" для получения с начала потока и "@latest" для получения только новых событий. Значение по умолчанию — "@latest".

starting_position_inclusive
bool или dict[str,bool]

Определите, является ли данный starting_position инклюзивным(>=) или нет (>). True для инклюзивного и False для монопольного. Это может быть диктовка с идентификатором секции в качестве ключа и bool в качестве значения, указывающего, является ли starting_position для определенной секции инклюзивным или нет. Это также может быть одно логическое значение для всех starting_position. Значение по умолчанию равно False.

on_error
callable[[PartitionContext, Exception]]

Функция обратного вызова, которая будет вызываться при возникновении ошибки во время получения после исчерпания повторных попыток или в процессе балансировки нагрузки. Обратный вызов принимает два параметра: partition_context который содержит сведения о секции, и ошибку , являющуюся исключением. partition_context может иметь значение Нет, если ошибка возникает во время балансировки нагрузки. Обратный вызов должен быть определен следующим образом: on_error(partition_context, ошибка). Обратный вызов on_error также будет вызываться, если во время обратного вызова on_event возникает необработанное исключение.

on_partition_initialize
callable[[PartitionContext]]

Функция обратного вызова, которая будет вызвана после того, как потребитель для определенной секции завершит инициализацию. Он также вызывается при создании нового внутреннего потребителя секции, чтобы взять на себя процесс получения для неудавшегося и закрытого внутреннего потребителя секции. Обратный вызов принимает один параметр: partition_context который содержит сведения о секции. Обратный вызов должен быть определен следующим образом: on_partition_initialize(partition_context)).

on_partition_close
callable[[PartitionContext, CloseReason]]

Функция обратного вызова, которая будет вызываться после закрытия потребителя для определенной секции. Он также вызывается при возникновении ошибки во время получения после исчерпания повторных попыток. Обратный вызов принимает два параметра: partition_context который содержит сведения о секции и причину закрытия. Обратный вызов должен быть определен следующим образом: on_partition_close(partition_context, причина). Ознакомьтесь с CloseReason различными причинами закрытия.

Возвращаемый тип

Примеры

Получение событий из EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive(on_event=on_event)

receive_batch

Получение событий из секций с необязательной балансировкой нагрузки и контрольными точками.

receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None

Параметры

on_event_batch
callable[PartitionContext, list[EventData]]
Обязательно

Функция обратного вызова для обработки пакета полученных событий. Обратный вызов принимает два параметра: partition_context который содержит контекст секции, и event_batch, который является полученными событиями. Функция обратного вызова должна быть определена следующим образом: on_event_batch(partition_context, event_batch). event_batch может быть пустым списком, если max_wait_time не имеет значения None или 0, а событие не получено после max_wait_time. Подробные сведения о контексте секции см. в PartitionContextразделе .

max_batch_size
int

Максимальное количество событий в пакете, передаваемых в обратный вызов on_event_batch. Если фактическое количество полученных событий превышает max_batch_size, полученные события делятся на пакеты и вызывают обратный вызов для каждого пакета с событиями до max_batch_size .

max_wait_time
float

Максимальный интервал в секундах, который обработчик событий будет ожидать перед вызовом обратного вызова. Если события не получены в течение этого интервала, обратный вызов on_event_batch будет вызываться с пустым списком.

partition_id
str

Если этот параметр указан, клиент будет получать только из этой секции. В противном случае клиент будет получать данные из всех секций.

owner_level
int

Приоритет исключительного потребителя. Если задано owner_level, будет создан эксклюзивный потребитель. Потребитель с более высоким owner_level имеет более высокий эксклюзивный приоритет. Уровень владельца также известен как "значение эпохи" потребителя.

prefetch
int

Количество событий для предварительной выборки из службы для обработки. Значение по умолчанию — 300.

track_last_enqueued_event_properties
bool

Указывает, должен ли потребитель запрашивать сведения о последнем событии, помещенном в очередь, в связанной секции, и отслеживать эти сведения по мере получения событий. При отслеживании сведений о последнем событии секций, помещенных в очередь, каждое событие, полученное от службы Центров событий, будет содержать метаданные о секции. Это приводит к небольшому объему дополнительного потребления пропускной способности сети, что, как правило, является благоприятным компромиссом при рассмотрении проблемы с периодическим выполнением запросов свойств секции с помощью клиента концентратора событий. По умолчанию задано значение False .

starting_position
str, int, datetime или dict[str,any]

Начните получать данные из этой позиции события, если для секции нет данных контрольных точек. Данные контрольных точек будут использоваться, если они доступны. Это может быть диктовка с идентификатором секции в качестве ключа и положением в качестве значения для отдельных секций или одно значение для всех секций. Тип значения может быть str, int или datetime.datetime. Также поддерживаются значения "-1" для получения с начала потока и "@latest" для получения только новых событий. Значение по умолчанию — "@latest".

starting_position_inclusive
bool или dict[str,bool]

Определите, является ли данный starting_position инклюзивным(>=) или нет (>). True для инклюзивного и False для монопольного. Это может быть диктовка с идентификатором секции в качестве ключа и bool в качестве значения, указывающего, является ли starting_position для определенной секции инклюзивным или нет. Это также может быть одно логическое значение для всех starting_position. Значение по умолчанию равно False.

on_error
callable[[PartitionContext, Exception]]

Функция обратного вызова, которая будет вызываться при возникновении ошибки во время получения после исчерпания повторных попыток или в процессе балансировки нагрузки. Обратный вызов принимает два параметра: partition_context который содержит сведения о секции, и ошибку , являющуюся исключением. partition_context может иметь значение Нет, если ошибка возникает во время балансировки нагрузки. Обратный вызов должен быть определен следующим образом: on_error(partition_context, ошибка). Обратный вызов on_error также будет вызываться, если во время обратного вызова on_event возникает необработанное исключение.

on_partition_initialize
callable[[PartitionContext]]

Функция обратного вызова, которая будет вызвана после того, как потребитель для определенной секции завершит инициализацию. Он также вызывается при создании нового внутреннего потребителя секции, чтобы взять на себя процесс получения для неудавшегося и закрытого внутреннего потребителя секции. Обратный вызов принимает один параметр: partition_context который содержит сведения о секции. Обратный вызов должен быть определен следующим образом: on_partition_initialize(partition_context)).

on_partition_close
callable[[PartitionContext, CloseReason]]

Функция обратного вызова, которая будет вызываться после закрытия потребителя для определенной секции. Он также вызывается при возникновении ошибки во время получения после исчерпания повторных попыток. Обратный вызов принимает два параметра: partition_context который содержит сведения о секции и причину закрытия. Обратный вызов должен быть определен следующим образом: on_partition_close(partition_context, причина). Ознакомьтесь с CloseReason различными причинами закрытия.

Возвращаемый тип

Примеры

Получение событий в пакетах из EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received events from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive_batch(on_event_batch=on_event_batch)