EventHubConsumerClient Класс
Класс EventHubConsumerClient определяет высокоуровневый интерфейс для получения событий из службы Центры событий Azure.
Целью main EventHubConsumerClient является получение событий из всех секций EventHub с помощью балансировки нагрузки и создания контрольных точек.
Если несколько экземпляров EventHubConsumerClient выполняются в одном концентраторе событий, группе потребителей и расположении контрольных точек, секции будут равномерно распределены между ними.
Чтобы включить балансировку нагрузки и сохраняемые контрольные точки, необходимо задать checkpoint_store при создании EventHubConsumerClient. Если хранилище контрольных точек не предоставлено, контрольная точка будет храниться внутри памяти.
EventHubConsumerClient также может получать из определенной секции при вызове метода receive() или receive_batch() и указании partition_id. Балансировка нагрузки не будет работать в односекционном режиме. Но пользователи по-прежнему могут сохранять контрольные точки, если задано checkpoint_store.
- Наследование
-
azure.eventhub._client_base.ClientBaseEventHubConsumerClient
Конструктор
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Параметры
- fully_qualified_namespace
- str
Полное имя узла для пространства имен Центров событий. Формат пространства имен: .servicebus.windows.net.
- eventhub_name
- str
Путь к конкретному концентратору событий, к которому подключается клиент.
- consumer_group
- str
Получение событий из концентратора событий для этой группы потребителей.
- credential
- TokenCredential или AzureSasCredential или AzureNamedKeyCredential
Объект учетных данных, используемый для проверки подлинности, который реализует определенный интерфейс для получения маркеров. Он принимает объекты учетных EventHubSharedKeyCredentialданных или , созданные библиотекой azure-identity, и объекты, реализующие метод *get_token(self, scopes).
- logging_enable
- bool
Следует ли выводить журналы трассировки сети в средство ведения журнала. Значение по умолчанию — False.
- auth_timeout
- float
Время в секундах для ожидания маркера, который будет авторизован службой. Значение по умолчанию — 60 секунд. Если задано значение 0, время ожидания клиента не будет принудительно применяться.
- user_agent
- str
Если этот параметр указан, он будет добавлен перед строкой агента пользователя.
- retry_total
- int
Общее число попыток повторного выполнения операции, завершилось сбоем при возникновении ошибки. Значение по умолчанию — 3. Контекст retry_total в получении является особым: метод получения реализуется циклом while, вызывающим внутренний метод получения в каждой итерации. В случае полученияretry_total указывает количество повторных попыток после ошибки, вызванной внутренним методом получения в цикле while. Если повторные попытки исчерпаны, будет вызван обратный вызов on_error (если он указан) со сведениями об ошибке. Неудавшийся потребитель внутренней секции будет закрыт (on_partition_close будет вызван, если указан), и будет создан новый внутренний потребитель секции (on_partition_initialize будет вызван, если указан), чтобы возобновить получение.
- retry_backoff_factor
- float
Коэффициент задержки, применяемый между попытками после второй попытки (большинство ошибок устраняются немедленно второй попыткой без задержки). В фиксированном режиме политика повторных попыток всегда будет переходить в спящий режим для {backoff factor}. В экспоненциальном режиме политика повторных попыток будет переходить в спящий режим в течение: {коэффициент задержки} * (2 ** ({общее число повторных попыток} - 1)) секунд. Если backoff_factor равно 0,1, то повторная попытка будет в спящем режиме [0.0s, 0.2s, 0.4s, ...] между повторными попытками. Значение по умолчанию — 0,8.
- retry_backoff_max
- float
Максимальное время отката. Значение по умолчанию — 120 секунд (2 минуты).
- retry_mode
- str
Поведение задержки между повторными попытками. Поддерживаемые значения: fixed или exponential, где по умолчанию — exponential.
- idle_timeout
- float
Время ожидания в секундах, по истечении которого клиент закроет базовое подключение, если дальнейшие действия отсутствуют. По умолчанию значение равно None. Это означает, что клиент не будет завершать работу из-за бездействия, если только он не инициирован службой.
- transport_type
- TransportType
Тип транспортного протокола, который будет использоваться для взаимодействия со службой Центров событий. Значение по умолчанию — TransportType.Amqp , в котором используется порт 5671. Если порт 5671 недоступен или заблокирован в сетевой среде, можно использовать TransportType.AmqpOverWebsocket , который использует порт 443 для обмена данными.
Параметры прокси-сервера HTTP. Это должен быть словарь со следующими ключами: "proxy_hostname" (значение str) и "proxy_port" (значение int). Кроме того, могут присутствовать следующие ключи: "имя пользователя", "пароль".
- checkpoint_store
- CheckpointStore или None
Диспетчер, хранящий данные балансировки нагрузки секции и контрольных точек при получении событий. Хранилище контрольных точек будет использоваться в обоих случаях получения из всех секций или из одной секции. В последнем случае балансировка нагрузки не применяется. Если хранилище контрольных точек не предоставлено, контрольная точка будет поддерживаться внутри памяти, а экземпляр EventHubConsumerClient будет получать события без балансировки нагрузки.
- load_balancing_interval
- float
При выполнении балансировки нагрузки. Это интервал в секундах между двумя оценками балансировки нагрузки. Значение по умолчанию — 30 секунд.
- partition_ownership_expiration_interval
- float
Срок владения секцией истекает через это количество секунд. Каждая оценка балансировки нагрузки автоматически продлевает срок действия владения. Значение по умолчанию — 6 * load_balancing_interval, т. е. 180 секунд при использовании load_balancing_interval по умолчанию 30 секунд.
- load_balancing_strategy
- str или LoadBalancingStrategy
При выполнении балансировки нагрузки она будет использовать эту стратегию, чтобы запросить и сбалансировать владение секцией. Используйте "жадный" или LoadBalancingStrategy.GREEDY для жадной стратегии, которая при каждой оценке балансировки нагрузки будет захватывать столько невостребованных секций, необходимых для балансировки нагрузки. Используйте "balanced" или LoadBalancingStrategy.BALANCED для сбалансированной стратегии, которая для каждой оценки балансировки нагрузки утверждает только одну секцию, которая не является заявленной другим EventHubConsumerClient. Если все секции концентратора событий запрашиваются другим EventHubConsumerClient и этот клиент утверждал слишком мало секций, этот клиент будет похищать одну секцию у других клиентов для каждой оценки балансировки нагрузки, независимо от стратегии балансировки нагрузки. По умолчанию используется жадная стратегия.
Адрес пользовательской конечной точки, используемый для установления подключения к службе Центров событий, что позволяет маршрутизировать сетевые запросы через шлюзы приложений или другие пути, необходимые для среды узла. Значение по умолчанию — Нет. Формат будет выглядеть следующим образом: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Если порт не указан в custom_endpoint_address, по умолчанию будет использоваться порт 443.
Путь к пользовательскому CA_BUNDLE файлу SSL-сертификата, который используется для проверки подлинности удостоверения конечной точки подключения. Значение по умолчанию — None, в этом случае будет использоваться certifi.where().
- uamqp_transport
- bool
Следует ли использовать библиотеку uamqp в качестве базового транспорта. Значение по умолчанию — False, и в качестве базового транспорта будет использоваться библиотека AMQP Pure Python.
- socket_timeout
- float
Время в секундах, в течение времени ожидания базового сокета в подключении при отправке и получении данных до истечения времени ожидания. Значение по умолчанию — 0,2 для TransportType.Amqp и 1 для TransportType.AmqpOverWebsocket. Если возникают ошибки EventHubsConnectionError из-за истечения времени ожидания записи, может потребоваться передать значение, превышающее значение по умолчанию. Это предназначено для расширенных сценариев использования, и обычно значения по умолчанию должно быть достаточно.
Примеры
Создайте новый экземпляр EventHubConsumerClient.
import os
from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
consumer = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group='$Default',
credential=credential)
Методы
close |
Остановите получение событий из концентратора событий и закройте базовое подключение AMQP и ссылки. |
from_connection_string |
Создайте EventHubConsumerClient из строка подключения. |
get_eventhub_properties |
Получение свойств концентратора событий. Ключи в возвращенном словаре:
|
get_partition_ids |
Получение идентификаторов секций концентратора событий. |
get_partition_properties |
Получение свойств указанной секции. В словаре свойств содержатся следующие ключи:
|
receive |
Получение событий из секций с необязательной балансировкой нагрузки и контрольными точками. |
receive_batch |
Получение событий из секций с необязательной балансировкой нагрузки и контрольными точками. |
close
Остановите получение событий из концентратора событий и закройте базовое подключение AMQP и ссылки.
close() -> None
Возвращаемый тип
Примеры
Закройте клиент.
import os
import threading
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group="$Default",
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The 'receive' method is a blocking call, it can be executed in a thread for
# non-blocking behavior, and combined with the 'close' method.
worker = threading.Thread(
target=consumer.receive,
kwargs={
"on_event": on_event,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
worker.start()
time.sleep(10) # Keep receiving for 10s then close.
# Close down the consumer handler explicitly.
consumer.close()
from_connection_string
Создайте EventHubConsumerClient из строка подключения.
from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient
Параметры
- consumer_group
- str
Получение событий из концентратора событий для этой группы потребителей.
- eventhub_name
- str
Путь к конкретному концентратору событий, к которому подключается клиент.
- logging_enable
- bool
Следует ли выводить журналы трассировки сети в средство ведения журнала. Значение по умолчанию — False.
- auth_timeout
- float
Время в секундах для ожидания маркера, который будет авторизован службой. Значение по умолчанию — 60 секунд. Если задано значение 0, время ожидания клиента не будет принудительно применяться.
- user_agent
- str
Если этот параметр указан, он будет добавлен перед строкой агента пользователя.
- retry_total
- int
Общее число попыток повторного выполнения операции, завершилось сбоем при возникновении ошибки. Значение по умолчанию — 3. Контекст retry_total в получении является особым: метод получения реализуется циклом while, вызывающим внутренний метод получения в каждой итерации. В случае полученияretry_total указывает количество повторных попыток после ошибки, вызванной внутренним методом получения в цикле while. Если повторные попытки исчерпаны, будет вызван обратный вызов on_error (если он указан) со сведениями об ошибке. Неудавшийся потребитель внутренней секции будет закрыт (on_partition_close будет вызван, если указан), и будет создан новый внутренний потребитель секции (on_partition_initialize будет вызван, если указан), чтобы возобновить получение.
- retry_backoff_factor
- float
Коэффициент задержки, применяемый между попытками после второй попытки (большинство ошибок устраняются немедленно второй попыткой без задержки). В фиксированном режиме политика повторных попыток всегда будет переходить в спящий режим для {backoff factor}. В экспоненциальном режиме политика повторных попыток будет переходить в спящий режим в течение: {коэффициент задержки} * (2 ** ({общее число повторных попыток} - 1)) секунд. Если backoff_factor равно 0,1, то повторная попытка будет в спящем режиме [0,0s, 0,2s, 0,4s, ...] между повторными попытками. Значение по умолчанию — 0,8.
- retry_backoff_max
- float
Максимальное время отката. Значение по умолчанию — 120 секунд (2 минуты).
- retry_mode
- str
Поведение задержки между повторными попытками. Поддерживаемые значения: fixed или exponential, где значение по умолчанию — "экспоненциальная".
- idle_timeout
- float
Время ожидания в секундах, по истечении которого клиент закроет базовое подключение, если действие furthur отсутствует. По умолчанию значение равно None. Это означает, что клиент не будет завершать работу из-за бездействия, если не будет инициирована службой.
- transport_type
- TransportType
Тип транспортного протокола, который будет использоваться для взаимодействия со службой Центров событий. Значение по умолчанию — TransportType.Amqp , в котором используется порт 5671. Если порт 5671 недоступен или заблокирован в сетевой среде, можно использовать TransportType.AmqpOverWebsocket , который использует порт 443 для связи.
- http_proxy
- dict
Параметры прокси-сервера HTTP. Это должен быть словарь со следующими ключами: "proxy_hostname" (значение str) и "proxy_port" (значение int). Кроме того, могут присутствовать следующие ключи: "имя пользователя", "пароль".
- checkpoint_store
- CheckpointStore или None
Диспетчер, который хранит данные о балансировке нагрузки секции и контрольных точках при получении событий. Хранилище контрольных точек будет использоваться в обоих случаях получения из всех секций или из одной секции. В последнем случае балансировка нагрузки не применяется. Если хранилище контрольных точек не предоставлено, контрольная точка будет храниться внутри памяти, а экземпляр EventHubConsumerClient будет получать события без балансировки нагрузки.
- load_balancing_interval
- float
Когда балансировка нагрузки начинается. Это интервал (в секундах) между двумя оценками балансировки нагрузки. Значение по умолчанию — 10 секунд.
- partition_ownership_expiration_interval
- float
Срок действия владения секцией истекает через это количество секунд. Каждая оценка балансировки нагрузки автоматически продлевает срок действия владения. Значение по умолчанию — 6 * load_balancing_interval, т. е. 60 секунд при использовании load_balancing_interval по умолчанию 30 секунд.
- load_balancing_strategy
- str или LoadBalancingStrategy
Когда выполняется балансировка нагрузки, она будет использовать эту стратегию для утверждения и балансировки владения секцией. Используйте "жадный" или LoadBalancingStrategy.GREEDY для жадной стратегии, которая для каждой оценки балансировки нагрузки будет захватывать столько невостребованных секций, необходимых для балансировки нагрузки. Используйте "balanced" или LoadBalancingStrategy.BALANCED для сбалансированной стратегии, которая для каждой оценки балансировки нагрузки утверждает только одну секцию, которая не является заявленной другим EventHubConsumerClient. Если все секции EventHub запрашиваются другим EventHubConsumerClient и этот клиент утверждал слишком мало секций, этот клиент будет украсть одну секцию у других клиентов для каждой оценки балансировки нагрузки независимо от стратегии балансировки нагрузки. По умолчанию используется жадная стратегия.
Адрес пользовательской конечной точки, используемый для установления подключения к службе Центров событий, что позволяет маршрутизировать сетевые запросы через любые шлюзы приложений или другие пути, необходимые для среды узла. Значение по умолчанию — Нет. Формат будет следующим: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Если порт не указан в custom_endpoint_address, по умолчанию будет использоваться порт 443.
Путь к пользовательскому CA_BUNDLE файлу SSL-сертификата, который используется для проверки подлинности удостоверения конечной точки подключения. Значение по умолчанию — Нет, в этом случае будет использоваться certifi.where( ).
- uamqp_transport
- bool
Следует ли использовать библиотеку uamqp в качестве базового транспорта. Значение по умолчанию — False, а библиотека AMQP Pure Python будет использоваться в качестве базового транспорта.
Возвращаемый тип
Примеры
Создайте экземпляр EventHubConsumerClient из строка подключения.
import os
from azure.eventhub import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Получение свойств концентратора событий.
Ключи в возвращенном словаре:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
get_eventhub_properties() -> Dict[str, Any]
Возвращаемое значение
Словарь, содержащий сведения о концентраторе событий.
Возвращаемый тип
Исключения
get_partition_ids
Получение идентификаторов секций концентратора событий.
get_partition_ids() -> List[str]
Возвращаемое значение
Список идентификаторов секций.
Возвращаемый тип
Исключения
get_partition_properties
Получение свойств указанной секции.
В словаре свойств содержатся следующие ключи:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
get_partition_properties(partition_id: str) -> Dict[str, Any]
Параметры
Возвращаемое значение
Словарь, содержащий свойства секции.
Возвращаемый тип
Исключения
receive
Получение событий из секций с необязательной балансировкой нагрузки и контрольными точками.
receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None
Параметры
- on_event
- callable[PartitionContext, EventData или None]
Функция обратного вызова для обработки полученного события. Обратный вызов принимает два параметра: partition_context который содержит контекст секции, и событие , являющееся полученным событием. Функция обратного вызова должна быть определена следующим образом: on_event(partition_context, событие). Подробные сведения о контексте секции см. в PartitionContextразделе .
- max_wait_time
- float
Максимальный интервал в секундах, который обработчик событий будет ожидать перед вызовом обратного вызова. Если события не получены в течение этого интервала, обратный вызов on_event будет вызываться с параметром None. Если для этого значения задано значение None или 0 (значение по умолчанию), обратный вызов не будет вызываться до получения события.
- partition_id
- str
Если этот параметр указан, клиент будет получать только из этой секции. В противном случае клиент будет получать данные из всех секций.
- owner_level
- int
Приоритет исключительного потребителя. Если задано owner_level, будет создан эксклюзивный потребитель. Потребитель с более высоким owner_level имеет более высокий эксклюзивный приоритет. Уровень владельца также известен как "значение эпохи" потребителя.
- prefetch
- int
Количество событий для предварительной выборки из службы для обработки. Значение по умолчанию — 300.
- track_last_enqueued_event_properties
- bool
Указывает, должен ли потребитель запрашивать сведения о последнем событии, помещенном в очередь, в связанной секции, и отслеживать эти сведения по мере получения событий. При отслеживании сведений о последнем событии секций, помещенных в очередь, каждое событие, полученное от службы Центров событий, будет содержать метаданные о секции. Это приводит к небольшому объему дополнительного потребления пропускной способности сети, что, как правило, является благоприятным компромиссом при рассмотрении проблемы с периодическим выполнением запросов свойств секции с помощью клиента концентратора событий. По умолчанию задано значение False .
Начните получать данные из этой позиции события, если для секции нет данных контрольных точек. Данные контрольных точек будут использоваться, если они доступны. Это может быть диктовка с идентификатором секции в качестве ключа и положением в качестве значения для отдельных секций или одно значение для всех секций. Тип значения может быть str, int или datetime.datetime. Также поддерживаются значения "-1" для получения с начала потока и "@latest" для получения только новых событий. Значение по умолчанию — "@latest".
Определите, является ли данный starting_position инклюзивным(>=) или нет (>). True для инклюзивного и False для монопольного. Это может быть диктовка с идентификатором секции в качестве ключа и bool в качестве значения, указывающего, является ли starting_position для определенной секции инклюзивным или нет. Это также может быть одно логическое значение для всех starting_position. Значение по умолчанию равно False.
- on_error
- callable[[PartitionContext, Exception]]
Функция обратного вызова, которая будет вызываться при возникновении ошибки во время получения после исчерпания повторных попыток или в процессе балансировки нагрузки. Обратный вызов принимает два параметра: partition_context который содержит сведения о секции, и ошибку , являющуюся исключением. partition_context может иметь значение Нет, если ошибка возникает во время балансировки нагрузки. Обратный вызов должен быть определен следующим образом: on_error(partition_context, ошибка). Обратный вызов on_error также будет вызываться, если во время обратного вызова on_event возникает необработанное исключение.
- on_partition_initialize
- callable[[PartitionContext]]
Функция обратного вызова, которая будет вызвана после того, как потребитель для определенной секции завершит инициализацию. Он также вызывается при создании нового внутреннего потребителя секции, чтобы взять на себя процесс получения для неудавшегося и закрытого внутреннего потребителя секции. Обратный вызов принимает один параметр: partition_context который содержит сведения о секции. Обратный вызов должен быть определен следующим образом: on_partition_initialize(partition_context)).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
Функция обратного вызова, которая будет вызываться после закрытия потребителя для определенной секции. Он также вызывается при возникновении ошибки во время получения после исчерпания повторных попыток. Обратный вызов принимает два параметра: partition_context который содержит сведения о секции и причину закрытия. Обратный вызов должен быть определен следующим образом: on_partition_close(partition_context, причина). Ознакомьтесь с CloseReason различными причинами закрытия.
Возвращаемый тип
Примеры
Получение событий из EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive(on_event=on_event)
receive_batch
Получение событий из секций с необязательной балансировкой нагрузки и контрольными точками.
receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None
Параметры
- on_event_batch
- callable[PartitionContext, list[EventData]]
Функция обратного вызова для обработки пакета полученных событий. Обратный вызов принимает два параметра: partition_context который содержит контекст секции, и event_batch, который является полученными событиями. Функция обратного вызова должна быть определена следующим образом: on_event_batch(partition_context, event_batch). event_batch может быть пустым списком, если max_wait_time не имеет значения None или 0, а событие не получено после max_wait_time. Подробные сведения о контексте секции см. в PartitionContextразделе .
- max_batch_size
- int
Максимальное количество событий в пакете, передаваемых в обратный вызов on_event_batch. Если фактическое количество полученных событий превышает max_batch_size, полученные события делятся на пакеты и вызывают обратный вызов для каждого пакета с событиями до max_batch_size .
- max_wait_time
- float
Максимальный интервал в секундах, который обработчик событий будет ожидать перед вызовом обратного вызова. Если события не получены в течение этого интервала, обратный вызов on_event_batch будет вызываться с пустым списком.
- partition_id
- str
Если этот параметр указан, клиент будет получать только из этой секции. В противном случае клиент будет получать данные из всех секций.
- owner_level
- int
Приоритет исключительного потребителя. Если задано owner_level, будет создан эксклюзивный потребитель. Потребитель с более высоким owner_level имеет более высокий эксклюзивный приоритет. Уровень владельца также известен как "значение эпохи" потребителя.
- prefetch
- int
Количество событий для предварительной выборки из службы для обработки. Значение по умолчанию — 300.
- track_last_enqueued_event_properties
- bool
Указывает, должен ли потребитель запрашивать сведения о последнем событии, помещенном в очередь, в связанной секции, и отслеживать эти сведения по мере получения событий. При отслеживании сведений о последнем событии секций, помещенных в очередь, каждое событие, полученное от службы Центров событий, будет содержать метаданные о секции. Это приводит к небольшому объему дополнительного потребления пропускной способности сети, что, как правило, является благоприятным компромиссом при рассмотрении проблемы с периодическим выполнением запросов свойств секции с помощью клиента концентратора событий. По умолчанию задано значение False .
Начните получать данные из этой позиции события, если для секции нет данных контрольных точек. Данные контрольных точек будут использоваться, если они доступны. Это может быть диктовка с идентификатором секции в качестве ключа и положением в качестве значения для отдельных секций или одно значение для всех секций. Тип значения может быть str, int или datetime.datetime. Также поддерживаются значения "-1" для получения с начала потока и "@latest" для получения только новых событий. Значение по умолчанию — "@latest".
Определите, является ли данный starting_position инклюзивным(>=) или нет (>). True для инклюзивного и False для монопольного. Это может быть диктовка с идентификатором секции в качестве ключа и bool в качестве значения, указывающего, является ли starting_position для определенной секции инклюзивным или нет. Это также может быть одно логическое значение для всех starting_position. Значение по умолчанию равно False.
- on_error
- callable[[PartitionContext, Exception]]
Функция обратного вызова, которая будет вызываться при возникновении ошибки во время получения после исчерпания повторных попыток или в процессе балансировки нагрузки. Обратный вызов принимает два параметра: partition_context который содержит сведения о секции, и ошибку , являющуюся исключением. partition_context может иметь значение Нет, если ошибка возникает во время балансировки нагрузки. Обратный вызов должен быть определен следующим образом: on_error(partition_context, ошибка). Обратный вызов on_error также будет вызываться, если во время обратного вызова on_event возникает необработанное исключение.
- on_partition_initialize
- callable[[PartitionContext]]
Функция обратного вызова, которая будет вызвана после того, как потребитель для определенной секции завершит инициализацию. Он также вызывается при создании нового внутреннего потребителя секции, чтобы взять на себя процесс получения для неудавшегося и закрытого внутреннего потребителя секции. Обратный вызов принимает один параметр: partition_context который содержит сведения о секции. Обратный вызов должен быть определен следующим образом: on_partition_initialize(partition_context)).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
Функция обратного вызова, которая будет вызываться после закрытия потребителя для определенной секции. Он также вызывается при возникновении ошибки во время получения после исчерпания повторных попыток. Обратный вызов принимает два параметра: partition_context который содержит сведения о секции и причину закрытия. Обратный вызов должен быть определен следующим образом: on_partition_close(partition_context, причина). Ознакомьтесь с CloseReason различными причинами закрытия.
Возвращаемый тип
Примеры
Получение событий в пакетах из EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received events from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive_batch(on_event_batch=on_event_batch)
Azure SDK for Python