EventHubProducerClient Класс
Класс EventHubProducerClient определяет высокоуровневый интерфейс для отправки событий в службу Центры событий Azure.
- Наследование
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubProducerClient
Конструктор
EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)
Параметры
- fully_qualified_namespace
- str
Полное имя узла для пространства имен Центров событий. Скорее всего, это будет похоже на .servicebus.windows.net
- eventhub_name
- str
Путь к конкретному концентратору событий, к которому подключается клиент.
- credential
- AsyncTokenCredential или AzureSasCredential или AzureNamedKeyCredential
Объект учетных данных, используемый для проверки подлинности, который реализует определенный интерфейс для получения маркеров. Он принимает объекты учетных EventHubSharedKeyCredentialданных или , созданные библиотекой azure-identity, и объекты, реализующие метод *get_token(self, scopes).
- buffered_mode
- bool
Если задано значение True, клиент-производитель будет собирать события в буфере, эффективно выполнять пакетную обработку, а затем публиковать. Значение по умолчанию — False.
Обратный вызов, вызываемый после успешной публикации пакета. Обратный вызов принимает два параметра:
events: список событий, которые были успешно опубликованы.
partition_id: идентификатор секции, в котором были опубликованы события в списке.
Функция обратного вызова должна быть определена следующим образом: on_success(events, partition_id). Требуется, если buffered_mode имеет значение True, а необязательно, если buffered_mode имеет значение False.
Обратный вызов, вызываемый после того, как пакет не был опубликован. Требуется, если в buffered_mode имеет значение True, а необязательно, если buffered_mode имеет значение False. Функция обратного вызова должна быть определена следующим образом: on_error(events, partition_id, error), где:
events: список событий, которые не удалось опубликовать.
partition_id: идентификатор секции, в котором пытались опубликовать события в списке.
error: исключение, связанное с ошибкой отправки.
Если buffered_mode имеет значение False, on_error обратный вызов необязателен, а ошибки будут обрабатываться следующим образом:
Если обратный вызов on_error передается во время создания экземпляра клиента-производителя,
затем сведения об ошибке будут переданы обратному вызову on_error, который затем будет вызван.
Если обратный вызов on_error не передается во время создания экземпляра клиента,
тогда ошибка будет возникать по умолчанию.
Если buffered_mode имеет значение True, требуется обратный вызов on_error , а ошибки будут обрабатываться следующим образом:
Если события не удается поставить в очередь в течение заданного времени ожидания, будет напрямую вызвана ошибка.
Если после успешного постановки в очередь события не удается отправить, будет вызван обратный вызов on_error .
- max_buffer_length
- int
Только в буферизованном режиме. Общее количество событий на секцию, которые можно буферизать до запуска очистки. Значение по умолчанию — 1500 в режиме буферизации.
Только в буферизованном режиме. Время ожидания сборки пакета с событиями в буфере перед публикацией. Значение по умолчанию — 1 в режиме буферизации.
- logging_enable
- bool
Указывает, следует ли выводить журналы трассировки сети в средство ведения журнала. Значение по умолчанию — False.
- auth_timeout
- float
Время в секундах ожидания авторизации маркера службой. Значение по умолчанию — 60 секунд. Если задано значение 0, время ожидания от клиента не будет применяться.
- user_agent
- str
Если этот параметр указан, он будет добавлен перед строкой агента пользователя.
- retry_total
- int
Общее число попыток повторного выполнения неудачной операции при возникновении ошибки. Значение по умолчанию — 3.
- retry_backoff_factor
- float
Коэффициент задержки, применяемый между попытками после второй попытки (большинство ошибок устраняются немедленно второй попыткой без задержки). В фиксированном режиме политика повторных попыток всегда будет переходить в спящий режим для {backoff factor}. В экспоненциальном режиме политика повторных попыток будет переходить в спящий режим в течение: {коэффициент задержки} * (2 ** ({общее число повторных попыток} — 1)) секунд. Если backoff_factor равно 0,1, то повторная попытка будет в спящем режиме [0,0s, 0,2s, 0,4s, ...] между повторными попытками. Значение по умолчанию — 0,8.
- retry_backoff_max
- float
Максимальное время отката. Значение по умолчанию — 120 секунд (2 минуты).
- retry_mode
- str
Поведение задержки между повторными попытками. Поддерживаемые значения: fixed или exponential, где значение по умолчанию — "экспоненциальная".
- idle_timeout
- float
Время ожидания в секундах, по истечении которого клиент закроет базовое подключение, если действия отсутствуют. По умолчанию значение равно None. Это означает, что клиент не будет завершать работу из-за бездействия, если не будет инициирована службой.
- transport_type
- TransportType
Тип транспортного протокола, который будет использоваться для взаимодействия со службой Центров событий. Значение по умолчанию — TransportType.Amqp , в котором используется порт 5671. Если порт 5671 недоступен или заблокирован в сетевой среде, можно использовать TransportType.AmqpOverWebsocket , который использует порт 443 для связи.
- http_proxy
- dict
Параметры прокси-сервера HTTP. Это должен быть словарь со следующими ключами: "proxy_hostname" (значение str) и "proxy_port" (значение int). Кроме того, могут присутствовать следующие ключи: "имя пользователя", "пароль".
Адрес пользовательской конечной точки, используемый для установления подключения к службе Центров событий, что позволяет маршрутизировать сетевые запросы через любые шлюзы приложений или другие пути, необходимые для среды узла. Значение по умолчанию — Нет. Формат будет следующим: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Если порт не указан в custom_endpoint_address, по умолчанию будет использоваться порт 443.
Путь к пользовательскому CA_BUNDLE файлу SSL-сертификата, который используется для проверки подлинности удостоверения конечной точки подключения. Значение по умолчанию — Нет, в этом случае будет использоваться certifi.where( ).
- uamqp_transport
- bool
Следует ли использовать библиотеку uamqp в качестве базового транспорта. Значение по умолчанию — False, а библиотека AMQP Pure Python будет использоваться в качестве базового транспорта.
- socket_timeout
- float
Время в секундах, в течение времени ожидания базового сокета подключения при отправке и получении данных до истечения времени ожидания. Значение по умолчанию — 0,2 для TransportType.Amqp и 1 для TransportType.AmqpOverWebsocket. Если возникают ошибки EventHubsConnectionError из-за истечения времени ожидания записи, может потребоваться передать значение, превышающее значение по умолчанию. Это предназначено для расширенных сценариев использования, и обычно значения по умолчанию должно быть достаточно.
Примеры
Создайте новый экземпляр EventHubProducerClient.
import os
from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Методы
close |
Закройте базовое подключение и ссылки AMQP клиента Производителя. |
create_batch |
Создайте объект EventDataBatch с максимальным размером всего содержимого, ограниченным max_size_in_bytes. Max_size_in_bytes не должен превышать максимальный допустимый размер сообщения, определенный службой. |
flush |
Только в буферизованном режиме. Очистка событий в буфере, которые должны быть немедленно отправлены, если клиент работает в буферизованном режиме. |
from_connection_string |
Создайте EventHubProducerClient из строка подключения. |
get_buffered_event_count |
Количество событий, которые буферикуются и ожидают публикации для определенной секции. Возвращает значение None в режиме без буферизации. ПРИМЕЧАНИЕ. Буфер событий обрабатывается в фоновой сопрограмме, поэтому количество событий в буфере, сообщаемое этим API, должно рассматриваться только приблизительно и рекомендуется только для использования в отладке. Для идентификатора секции без буферизации событий будет возвращено значение 0 независимо от того, существует ли этот идентификатор секции в концентраторе событий. |
get_eventhub_properties |
Получение свойств концентратора событий. Ключи в возвращенном словаре:
|
get_partition_ids |
Получение идентификаторов секций концентратора событий. |
get_partition_properties |
Получение свойств указанной секции. В словаре свойств содержатся следующие ключи:
|
send_batch |
Отправляет пакет данных о событиях. По умолчанию метод блокируется до получения подтверждения или истечения времени ожидания операции. Если EventHubProducerClient настроен для запуска в буферизованном режиме, метод помещает события в очередь в локальный буфер и возвращает их. Производитель выполняет автоматическую отправку в фоновом режиме. Если buffered_mode имеет значение False, on_error обратный вызов необязателен, а ошибки будут обрабатываться следующим образом:
Если buffered_mode имеет значение True, требуется обратный вызов on_error , а ошибки будут обрабатываться следующим образом:
В режиме буферизации отправка пакета останется нетронутой и отправляется как единое целое. Пакет не будет переупорядочен. Это может привести к неэффективности отправки событий. Если вы отправляете конечный список EventData или AmqpAnnotatedMessage и знаете, что он находится в пределах предельного размера кадра концентратора событий, вы можете отправить их с помощью вызова send_batch . В противном случае используйте create_batch для создания EventDataBatch и добавьте EventData или AmqpAnnotatedMessage в пакет по одному до ограничения размера, а затем вызовите этот метод для отправки пакета. |
send_event |
Отправляет данные события. По умолчанию метод блокируется до получения подтверждения или истечения времени ожидания операции. Если EventHubProducerClient настроен для запуска в буферизованном режиме, метод помещает событие в очередь в локальный буфер и возвращает его. Производитель будет выполнять автоматическую пакетную обработку и отправку в фоновом режиме. Если buffered_mode имеет значение False, on_error обратный вызов необязателен и ошибки будут обрабатываться следующим образом: * Если обратный вызов on_error передается во время создания экземпляра клиента-производителя,
Если buffered_mode имеет значение True, требуется обратный вызов on_error , а ошибки будут обрабатываться следующим образом: * Если события не могут поставить в очередь в течение заданного времени ожидания, то будет напрямую вызвана ошибка.
|
close
Закройте базовое подключение и ссылки AMQP клиента Производителя.
async close(*, flush: bool = True, **kwargs: Any) -> None
Параметры
- flush
- bool
Только в буферизованном режиме. Если задано значение True, события в буфере будут отправляться немедленно. Значение по умолчанию — True.
Только в буферизованном режиме. Время ожидания для закрытия производителя. Значение по умолчанию — Нет, что означает отсутствие времени ожидания.
Возвращаемый тип
Исключения
Если при очистке буфера произошла ошибка, если для свойства очистки задано значение True или при закрытии базовых подключений AMQP в режиме буферизации.
Примеры
Закройте обработчик.
import os
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
try:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
finally:
# Close down the producer handler.
await producer.close()
create_batch
Создайте объект EventDataBatch с максимальным размером всего содержимого, ограниченным max_size_in_bytes.
Max_size_in_bytes не должен превышать максимальный допустимый размер сообщения, определенный службой.
async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch
Возвращаемый тип
Исключения
Если при очистке буфера произошла ошибка, если для свойства очистки задано значение True или при закрытии базовых подключений AMQP в режиме буферизации.
Примеры
Создание объекта EventDataBatch в пределах ограниченного размера
from azure.eventhub import EventData
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
flush
Только в буферизованном режиме. Очистка событий в буфере, которые должны быть немедленно отправлены, если клиент работает в буферизованном режиме.
async flush(**kwargs: Any) -> None
Параметры
Время ожидания для очистки буферизированных событий. Значение по умолчанию — Нет, что означает отсутствие времени ожидания.
Возвращаемый тип
Исключения
Если производителю не удается очистить буфер в течение заданного времени ожидания в буферизованном режиме.
from_connection_string
Создайте EventHubProducerClient из строка подключения.
from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient
Параметры
- eventhub_name
- str
Путь к конкретному концентратору событий, к которому подключается клиент.
- buffered_mode
- bool
Если задано значение True, клиент-производитель будет собирать события в буфере, эффективно выполнять пакетную обработку, а затем публиковать. Значение по умолчанию — False.
Обратный вызов, вызываемый после успешной публикации пакета. Обратный вызов принимает два параметра:
events: список событий, которые были успешно опубликованы.
partition_id: идентификатор секции, в котором были опубликованы события в списке.
Функция обратного вызова должна быть определена следующим образом: on_success(events, partition_id). Он является обязательным, если buffered_mode имеет значение True, а если buffered_mode имеет значение False, необязательно.
Обратный вызов, вызываемый после того, как пакет не был опубликован. Функция обратного вызова должна быть определена следующим образом: on_error(events, partition_id, error), где:
events: список событий, которые не удалось опубликовать.
partition_id: идентификатор секции, в котором пытались опубликовать события в списке.
error: исключение, связанное с ошибкой отправки.
Если buffered_mode имеет значение False, on_error обратный вызов необязателен, а ошибки будут обрабатываться следующим образом:
Если обратный вызов on_error передается во время создания экземпляра клиента-производителя,
затем сведения об ошибке будут переданы обратному вызову on_error, который затем будет вызван.
Если обратный вызов on_error не передается во время создания экземпляра клиента,
тогда ошибка будет возникать по умолчанию.
Если buffered_mode имеет значение True, требуется обратный вызов on_error , а ошибки будут обрабатываться следующим образом:
Если события не удается поставить в очередь в течение заданного времени ожидания, будет напрямую вызвана ошибка.
Если после успешного постановки в очередь события не удается отправить, будет вызван обратный вызов on_error .
- max_buffer_length
- int
Только в буферизованном режиме. Общее количество событий на секцию, которые можно буферизать до запуска очистки. Значение по умолчанию — 1500 в режиме буферизации.
Только в буферизованном режиме. Время ожидания сборки пакета с событиями в буфере перед публикацией. Значение по умолчанию — 1 в режиме буферизации.
- logging_enable
- bool
Указывает, следует ли выводить журналы трассировки сети в средство ведения журнала. Значение по умолчанию — False.
- http_proxy
- dict
Параметры прокси-сервера HTTP. Это должен быть словарь со следующими ключами: "proxy_hostname" (значение str) и "proxy_port" (значение int). Кроме того, могут присутствовать следующие ключи: "имя пользователя", "пароль".
- auth_timeout
- float
Время в секундах ожидания авторизации маркера службой. Значение по умолчанию — 60 секунд. Если задано значение 0, время ожидания от клиента не будет применяться.
- user_agent
- str
Если этот параметр указан, он будет добавлен перед строкой агента пользователя.
- retry_total
- int
Общее число попыток повторного выполнения неудачной операции при возникновении ошибки. Значение по умолчанию — 3.
- retry_backoff_factor
- float
Коэффициент задержки, применяемый между попытками после второй попытки (большинство ошибок устраняются немедленно второй попыткой без задержки). В фиксированном режиме политика повторных попыток всегда будет переходить в спящий режим для {backoff factor}. В экспоненциальном режиме политика повторных попыток будет переходить в спящий режим в течение: {коэффициент задержки} * (2 ** ({общее число повторных попыток} — 1)) секунд. Если backoff_factor равно 0,1, то повторная попытка будет в спящем режиме [0,0s, 0,2s, 0,4s, ...] между повторными попытками. Значение по умолчанию — 0,8.
- retry_backoff_max
- float
Максимальное время отката. Значение по умолчанию — 120 секунд (2 минуты).
- retry_mode
- str
Поведение задержки между повторными попытками. Поддерживаемые значения: fixed или exponential, где значение по умолчанию — "экспоненциальная".
- idle_timeout
- float
Время ожидания в секундах, по истечении которого клиент закроет базовое подключение, если действия отсутствуют. По умолчанию значение равно None. Это означает, что клиент не будет завершать работу из-за бездействия, если не будет инициирована службой.
- transport_type
- TransportType
Тип транспортного протокола, который будет использоваться для взаимодействия со службой Центров событий. Значение по умолчанию — TransportType.Amqp , в котором используется порт 5671. Если порт 5671 недоступен или заблокирован в сетевой среде, можно использовать TransportType.AmqpOverWebsocket , который использует порт 443 для связи.
Адрес пользовательской конечной точки, используемый для установления подключения к службе Центров событий, что позволяет маршрутизировать сетевые запросы через любые шлюзы приложений или другие пути, необходимые для среды узла. Значение по умолчанию — Нет. Формат будет следующим: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Если порт не указан в custom_endpoint_address, по умолчанию будет использоваться порт 443.
Путь к пользовательскому CA_BUNDLE файлу SSL-сертификата, который используется для проверки подлинности удостоверения конечной точки подключения. Значение по умолчанию — Нет, в этом случае будет использоваться certifi.where( ).
- uamqp_transport
- bool
Следует ли использовать библиотеку uamqp в качестве базового транспорта. Значение по умолчанию — False, а библиотека AMQP Pure Python будет использоваться в качестве базового транспорта.
Возвращаемый тип
Исключения
Если при очистке буфера произошла ошибка, если для свойства очистки задано значение True или при закрытии базовых подключений AMQP в режиме буферизации.
Примеры
Создайте экземпляр EventHubProducerClient из строка подключения.
import os
from azure.eventhub.aio import EventHubProducerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_buffered_event_count
Количество событий, которые буферикуются и ожидают публикации для определенной секции. Возвращает значение None в режиме без буферизации. ПРИМЕЧАНИЕ. Буфер событий обрабатывается в фоновой сопрограмме, поэтому количество событий в буфере, сообщаемое этим API, должно рассматриваться только приблизительно и рекомендуется только для использования в отладке. Для идентификатора секции без буферизации событий будет возвращено значение 0 независимо от того, существует ли этот идентификатор секции в концентраторе событий.
get_buffered_event_count(partition_id: str) -> int | None
Параметры
Возвращаемый тип
Исключения
Если при очистке буфера произошла ошибка, если для свойства очистки задано значение True или при закрытии базовых подключений AMQP в режиме буферизации.
get_eventhub_properties
Получение свойств концентратора событий.
Ключи в возвращенном словаре:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
Возвращаемое значение
Словарь, содержащий сведения о концентраторе событий.
Возвращаемый тип
Исключения
get_partition_ids
Получение идентификаторов секций концентратора событий.
async get_partition_ids() -> List[str]
Возвращаемое значение
Список идентификаторов секций.
Возвращаемый тип
Исключения
get_partition_properties
Получение свойств указанной секции.
В словаре свойств содержатся следующие ключи:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Параметры
Возвращаемое значение
Диктовка свойств секции.
Возвращаемый тип
Исключения
send_batch
Отправляет пакет данных о событиях. По умолчанию метод блокируется до получения подтверждения или истечения времени ожидания операции. Если EventHubProducerClient настроен для запуска в буферизованном режиме, метод помещает события в очередь в локальный буфер и возвращает их. Производитель выполняет автоматическую отправку в фоновом режиме.
Если buffered_mode имеет значение False, on_error обратный вызов необязателен, а ошибки будут обрабатываться следующим образом:
Если обратный вызов on_error передается во время создания экземпляра клиента-производителя,
затем сведения об ошибке будут переданы обратному вызову on_error, который затем будет вызван.
Если обратный вызов on_error не передается во время создания экземпляра клиента,
тогда ошибка будет возникать по умолчанию.
Если buffered_mode имеет значение True, требуется обратный вызов on_error , а ошибки будут обрабатываться следующим образом:
Если события не удается поставить в очередь в течение заданного времени ожидания, будет напрямую вызвана ошибка.
Если после успешного постановки в очередь события не удается отправить, будет вызван обратный вызов on_error .
В режиме буферизации отправка пакета останется нетронутой и отправляется как единое целое. Пакет не будет переупорядочен. Это может привести к неэффективности отправки событий.
Если вы отправляете конечный список EventData или AmqpAnnotatedMessage и знаете, что он находится в пределах предельного размера кадра концентратора событий, вы можете отправить их с помощью вызова send_batch . В противном случае используйте create_batch для создания EventDataBatch и добавьте EventData или AmqpAnnotatedMessage в пакет по одному до ограничения размера, а затем вызовите этот метод для отправки пакета.
async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None
Параметры
- event_data_batch
- Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Отправляемый объект EventDataBatch или список данных EventData для отправки в пакете. Все EventData или AmqpAnnotatedMessage в списке или EventDataBatch будут помещаться в один раздел.
- timeout
- float
Максимальное время ожидания для отправки данных о событиях в режиме без буферизации или максимальное время ожидания для постановки данных события в буфер в режиме буферизации. В режиме без буферизации будет использоваться время ожидания по умолчанию, указанное при создании производителя. В режиме буферизации время ожидания по умолчанию — Нет.
- partition_id
- str
Конкретный идентификатор секции для отправки. Значение по умолчанию — Нет. В этом случае служба будет назначаться всем секциям с помощью циклического перебора. TypeError будет вызываться, если указан partition_id и event_data_batch является EventDataBatch, так как сам EventDataBatch имеет partition_id.
- partition_key
- str
При заданном partition_key данные о событиях будут отправляться в определенную секцию концентратора событий, определяемую службой. TypeError возникает, если указан partition_key и event_data_batch является EventDataBatch, так как сам EventDataBatch имеет partition_key. Если указаны partition_id и partition_key, приоритет будет иметь partition_id. ПРЕДУПРЕЖДЕНИЕ. Не рекомендуется задавать partition_key нестроковых значений для отправляемых событий, так как служба концентратора событий игнорирует partition_key, а события будут назначены всем секциям с помощью циклического перебора. Кроме того, существуют пакеты SDK для использования событий, которые ожидают, что partition_key будет только строковым типом. Они могут не проанализировать нестроковое значение.
Возвращаемый тип
Исключения
Если значение, указанное параметром времени ожидания, истекает до того, как событие может быть отправлено в режиме без буферизации, или события могут быть помещены в очередь в буферизованном режиме.
Примеры
Асинхронно отправляет данные о событиях
async with producer:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
send_event
Отправляет данные события. По умолчанию метод блокируется до получения подтверждения или истечения времени ожидания операции. Если EventHubProducerClient настроен для запуска в буферизованном режиме, метод помещает событие в очередь в локальный буфер и возвращает его. Производитель будет выполнять автоматическую пакетную обработку и отправку в фоновом режиме.
Если buffered_mode имеет значение False, on_error обратный вызов необязателен и ошибки будут обрабатываться следующим образом: * Если обратный вызов on_error передается во время создания экземпляра клиента-производителя,
then error information will be passed to the *on_error* callback, which will then be called.
* If an *on_error* callback is not passed in during client instantiation,
then the error will be raised by default.
Если buffered_mode имеет значение True, требуется обратный вызов on_error , а ошибки будут обрабатываться следующим образом: * Если события не могут поставить в очередь в течение заданного времени ожидания, то будет напрямую вызвана ошибка.
* If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None
Параметры
- timeout
- float
Максимальное время ожидания для отправки данных о событиях в режиме без буферизации или максимальное время ожидания для постановки данных события в буфер в режиме буферизации. В режиме без буферизации будет использоваться время ожидания по умолчанию, указанное при создании производителя. В режиме буферизации время ожидания по умолчанию — Нет.
- partition_id
- str
Конкретный идентификатор секции для отправки. Значение по умолчанию — Нет. В этом случае служба будет назначаться всем секциям с помощью циклического перебора. TypeError будет вызываться, если указан partition_id и event_data_batch является EventDataBatch, так как сам EventDataBatch имеет partition_id.
- partition_key
- str
При заданном partition_key данные о событиях будут отправляться в определенную секцию концентратора событий, определяемую службой. TypeError возникает, если указан partition_key и event_data_batch является EventDataBatch, так как сам EventDataBatch имеет partition_key. Если указаны partition_id и partition_key, приоритет будет иметь partition_id. ПРЕДУПРЕЖДЕНИЕ. Не рекомендуется задавать partition_key нестроковых значений для отправляемых событий, так как служба концентратора событий игнорирует partition_key, а события будут назначены всем секциям с помощью циклического перебора. Кроме того, существуют пакеты SDK для использования событий, которые ожидают, что partition_key будет только строковым типом. Они могут не проанализировать нестроковое значение.
Возвращаемый тип
Исключения
Если значение, заданное параметром времени ожидания, истекает до того, как событие может быть отправлено в режиме без буферизации, или события не могут быть помещены в очередь в буферизованном режиме.
Атрибуты
total_buffered_event_count
Общее количество событий, которые в данный момент буферичены и ожидают публикации, во всех секциях. Возвращает значение None в режиме без буферизации. ПРИМЕЧАНИЕ. Буфер событий обрабатывается в фоновой сопрограмме, поэтому количество событий в буфере, сообщаемое этим API, должно рассматриваться только приблизительно и рекомендуется только для использования в отладке.
Возвращаемый тип
Azure SDK for Python
Обратная связь
https://aka.ms/ContentUserFeedback.
Ожидается в ближайшее время: в течение 2024 года мы постепенно откажемся от GitHub Issues как механизма обратной связи для контента и заменим его новой системой обратной связи. Дополнительные сведения см. в разделеОтправить и просмотреть отзыв по