Поделиться через


EventHubProducerClient Класс

Класс EventHubProducerClient определяет высокоуровневый интерфейс для отправки событий в службу Центры событий Azure.

Наследование
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubProducerClient

Конструктор

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)

Параметры

fully_qualified_namespace
str
Обязательно

Полное имя узла для пространства имен Центров событий. Скорее всего, это будет похоже на .servicebus.windows.net

eventhub_name
str
Обязательно

Путь к конкретному концентратору событий, к которому подключается клиент.

credential
AsyncTokenCredential или AzureSasCredential или AzureNamedKeyCredential
Обязательно

Объект учетных данных, используемый для проверки подлинности, который реализует определенный интерфейс для получения маркеров. Он принимает объекты учетных EventHubSharedKeyCredentialданных или , созданные библиотекой azure-identity, и объекты, реализующие метод *get_token(self, scopes).

buffered_mode
bool

Если задано значение True, клиент-производитель будет собирать события в буфере, эффективно выполнять пакетную обработку, а затем публиковать. Значение по умолчанию — False.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Обратный вызов, вызываемый после успешной публикации пакета. Обратный вызов принимает два параметра:

  • events: список событий, которые были успешно опубликованы.

  • partition_id: идентификатор секции, в котором были опубликованы события в списке.

Функция обратного вызова должна быть определена следующим образом: on_success(events, partition_id). Требуется, если buffered_mode имеет значение True, а необязательно, если buffered_mode имеет значение False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Обратный вызов, вызываемый после того, как пакет не был опубликован. Требуется, если в buffered_mode имеет значение True, а необязательно, если buffered_mode имеет значение False. Функция обратного вызова должна быть определена следующим образом: on_error(events, partition_id, error), где:

  • events: список событий, которые не удалось опубликовать.

  • partition_id: идентификатор секции, в котором пытались опубликовать события в списке.

  • error: исключение, связанное с ошибкой отправки.

Если buffered_mode имеет значение False, on_error обратный вызов необязателен, а ошибки будут обрабатываться следующим образом:

  • Если обратный вызов on_error передается во время создания экземпляра клиента-производителя,

    затем сведения об ошибке будут переданы обратному вызову on_error, который затем будет вызван.

  • Если обратный вызов on_error не передается во время создания экземпляра клиента,

    тогда ошибка будет возникать по умолчанию.

Если buffered_mode имеет значение True, требуется обратный вызов on_error , а ошибки будут обрабатываться следующим образом:

  • Если события не удается поставить в очередь в течение заданного времени ожидания, будет напрямую вызвана ошибка.

  • Если после успешного постановки в очередь события не удается отправить, будет вызван обратный вызов on_error .

max_buffer_length
int

Только в буферизованном режиме. Общее количество событий на секцию, которые можно буферизать до запуска очистки. Значение по умолчанию — 1500 в режиме буферизации.

max_wait_time
Optional[float]

Только в буферизованном режиме. Время ожидания сборки пакета с событиями в буфере перед публикацией. Значение по умолчанию — 1 в режиме буферизации.

logging_enable
bool

Указывает, следует ли выводить журналы трассировки сети в средство ведения журнала. Значение по умолчанию — False.

auth_timeout
float

Время в секундах ожидания авторизации маркера службой. Значение по умолчанию — 60 секунд. Если задано значение 0, время ожидания от клиента не будет применяться.

user_agent
str

Если этот параметр указан, он будет добавлен перед строкой агента пользователя.

retry_total
int

Общее число попыток повторного выполнения неудачной операции при возникновении ошибки. Значение по умолчанию — 3.

retry_backoff_factor
float

Коэффициент задержки, применяемый между попытками после второй попытки (большинство ошибок устраняются немедленно второй попыткой без задержки). В фиксированном режиме политика повторных попыток всегда будет переходить в спящий режим для {backoff factor}. В экспоненциальном режиме политика повторных попыток будет переходить в спящий режим в течение: {коэффициент задержки} * (2 ** ({общее число повторных попыток} — 1)) секунд. Если backoff_factor равно 0,1, то повторная попытка будет в спящем режиме [0,0s, 0,2s, 0,4s, ...] между повторными попытками. Значение по умолчанию — 0,8.

retry_backoff_max
float

Максимальное время отката. Значение по умолчанию — 120 секунд (2 минуты).

retry_mode
str

Поведение задержки между повторными попытками. Поддерживаемые значения: fixed или exponential, где значение по умолчанию — "экспоненциальная".

idle_timeout
float

Время ожидания в секундах, по истечении которого клиент закроет базовое подключение, если действия отсутствуют. По умолчанию значение равно None. Это означает, что клиент не будет завершать работу из-за бездействия, если не будет инициирована службой.

transport_type
TransportType

Тип транспортного протокола, который будет использоваться для взаимодействия со службой Центров событий. Значение по умолчанию — TransportType.Amqp , в котором используется порт 5671. Если порт 5671 недоступен или заблокирован в сетевой среде, можно использовать TransportType.AmqpOverWebsocket , который использует порт 443 для связи.

http_proxy
dict

Параметры прокси-сервера HTTP. Это должен быть словарь со следующими ключами: "proxy_hostname" (значение str) и "proxy_port" (значение int). Кроме того, могут присутствовать следующие ключи: "имя пользователя", "пароль".

custom_endpoint_address
Optional[str]

Адрес пользовательской конечной точки, используемый для установления подключения к службе Центров событий, что позволяет маршрутизировать сетевые запросы через любые шлюзы приложений или другие пути, необходимые для среды узла. Значение по умолчанию — Нет. Формат будет следующим: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Если порт не указан в custom_endpoint_address, по умолчанию будет использоваться порт 443.

connection_verify
Optional[str]

Путь к пользовательскому CA_BUNDLE файлу SSL-сертификата, который используется для проверки подлинности удостоверения конечной точки подключения. Значение по умолчанию — Нет, в этом случае будет использоваться certifi.where( ).

uamqp_transport
bool

Следует ли использовать библиотеку uamqp в качестве базового транспорта. Значение по умолчанию — False, а библиотека AMQP Pure Python будет использоваться в качестве базового транспорта.

socket_timeout
float

Время в секундах, в течение времени ожидания базового сокета подключения при отправке и получении данных до истечения времени ожидания. Значение по умолчанию — 0,2 для TransportType.Amqp и 1 для TransportType.AmqpOverWebsocket. Если возникают ошибки EventHubsConnectionError из-за истечения времени ожидания записи, может потребоваться передать значение, превышающее значение по умолчанию. Это предназначено для расширенных сценариев использования, и обычно значения по умолчанию должно быть достаточно.

Примеры

Создайте новый экземпляр EventHubProducerClient.


   import os
   from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Методы

close

Закройте базовое подключение и ссылки AMQP клиента Производителя.

create_batch

Создайте объект EventDataBatch с максимальным размером всего содержимого, ограниченным max_size_in_bytes.

Max_size_in_bytes не должен превышать максимальный допустимый размер сообщения, определенный службой.

flush

Только в буферизованном режиме. Очистка событий в буфере, которые должны быть немедленно отправлены, если клиент работает в буферизованном режиме.

from_connection_string

Создайте EventHubProducerClient из строка подключения.

get_buffered_event_count

Количество событий, которые буферикуются и ожидают публикации для определенной секции. Возвращает значение None в режиме без буферизации. ПРИМЕЧАНИЕ. Буфер событий обрабатывается в фоновой сопрограмме, поэтому количество событий в буфере, сообщаемое этим API, должно рассматриваться только приблизительно и рекомендуется только для использования в отладке. Для идентификатора секции без буферизации событий будет возвращено значение 0 независимо от того, существует ли этот идентификатор секции в концентраторе событий.

get_eventhub_properties

Получение свойств концентратора событий.

Ключи в возвращенном словаре:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Получение идентификаторов секций концентратора событий.

get_partition_properties

Получение свойств указанной секции.

В словаре свойств содержатся следующие ключи:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

Отправляет пакет данных о событиях. По умолчанию метод блокируется до получения подтверждения или истечения времени ожидания операции. Если EventHubProducerClient настроен для запуска в буферизованном режиме, метод помещает события в очередь в локальный буфер и возвращает их. Производитель выполняет автоматическую отправку в фоновом режиме.

Если buffered_mode имеет значение False, on_error обратный вызов необязателен, а ошибки будут обрабатываться следующим образом:

  • Если обратный вызов on_error передается во время создания экземпляра клиента-производителя,

    затем сведения об ошибке будут переданы обратному вызову on_error, который затем будет вызван.

  • Если обратный вызов on_error не передается во время создания экземпляра клиента,

    тогда ошибка будет возникать по умолчанию.

Если buffered_mode имеет значение True, требуется обратный вызов on_error , а ошибки будут обрабатываться следующим образом:

  • Если события не удается поставить в очередь в течение заданного времени ожидания, будет напрямую вызвана ошибка.

  • Если после успешного постановки в очередь события не удается отправить, будет вызван обратный вызов on_error .

В режиме буферизации отправка пакета останется нетронутой и отправляется как единое целое. Пакет не будет переупорядочен. Это может привести к неэффективности отправки событий.

Если вы отправляете конечный список EventData или AmqpAnnotatedMessage и знаете, что он находится в пределах предельного размера кадра концентратора событий, вы можете отправить их с помощью вызова send_batch . В противном случае используйте create_batch для создания EventDataBatch и добавьте EventData или AmqpAnnotatedMessage в пакет по одному до ограничения размера, а затем вызовите этот метод для отправки пакета.

send_event

Отправляет данные события. По умолчанию метод блокируется до получения подтверждения или истечения времени ожидания операции. Если EventHubProducerClient настроен для запуска в буферизованном режиме, метод помещает событие в очередь в локальный буфер и возвращает его. Производитель будет выполнять автоматическую пакетную обработку и отправку в фоновом режиме.

Если buffered_mode имеет значение False, on_error обратный вызов необязателен и ошибки будут обрабатываться следующим образом: * Если обратный вызов on_error передается во время создания экземпляра клиента-производителя,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Если buffered_mode имеет значение True, требуется обратный вызов on_error , а ошибки будут обрабатываться следующим образом: * Если события не могут поставить в очередь в течение заданного времени ожидания, то будет напрямую вызвана ошибка.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Закройте базовое подключение и ссылки AMQP клиента Производителя.

async close(*, flush: bool = True, **kwargs: Any) -> None

Параметры

flush
bool

Только в буферизованном режиме. Если задано значение True, события в буфере будут отправляться немедленно. Значение по умолчанию — True.

timeout
float или None

Только в буферизованном режиме. Время ожидания для закрытия производителя. Значение по умолчанию — Нет, что означает отсутствие времени ожидания.

Возвращаемый тип

Исключения

Если при очистке буфера произошла ошибка, если для свойства очистки задано значение True или при закрытии базовых подключений AMQP в режиме буферизации.

Примеры

Закройте обработчик.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   from azure.eventhub import EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break
       await producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       await producer.close()

create_batch

Создайте объект EventDataBatch с максимальным размером всего содержимого, ограниченным max_size_in_bytes.

Max_size_in_bytes не должен превышать максимальный допустимый размер сообщения, определенный службой.

async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch

Возвращаемый тип

Исключения

Если при очистке буфера произошла ошибка, если для свойства очистки задано значение True или при закрытии базовых подключений AMQP в режиме буферизации.

Примеры

Создание объекта EventDataBatch в пределах ограниченного размера


       from azure.eventhub import EventData
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Только в буферизованном режиме. Очистка событий в буфере, которые должны быть немедленно отправлены, если клиент работает в буферизованном режиме.

async flush(**kwargs: Any) -> None

Параметры

timeout
float или None

Время ожидания для очистки буферизированных событий. Значение по умолчанию — Нет, что означает отсутствие времени ожидания.

Возвращаемый тип

Исключения

Если производителю не удается очистить буфер в течение заданного времени ожидания в буферизованном режиме.

from_connection_string

Создайте EventHubProducerClient из строка подключения.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient

Параметры

conn_str
str
Обязательно

Строка подключения концентратора событий.

eventhub_name
str

Путь к конкретному концентратору событий, к которому подключается клиент.

buffered_mode
bool

Если задано значение True, клиент-производитель будет собирать события в буфере, эффективно выполнять пакетную обработку, а затем публиковать. Значение по умолчанию — False.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Обратный вызов, вызываемый после успешной публикации пакета. Обратный вызов принимает два параметра:

  • events: список событий, которые были успешно опубликованы.

  • partition_id: идентификатор секции, в котором были опубликованы события в списке.

Функция обратного вызова должна быть определена следующим образом: on_success(events, partition_id). Он является обязательным, если buffered_mode имеет значение True, а если buffered_mode имеет значение False, необязательно.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Обратный вызов, вызываемый после того, как пакет не был опубликован. Функция обратного вызова должна быть определена следующим образом: on_error(events, partition_id, error), где:

  • events: список событий, которые не удалось опубликовать.

  • partition_id: идентификатор секции, в котором пытались опубликовать события в списке.

  • error: исключение, связанное с ошибкой отправки.

Если buffered_mode имеет значение False, on_error обратный вызов необязателен, а ошибки будут обрабатываться следующим образом:

  • Если обратный вызов on_error передается во время создания экземпляра клиента-производителя,

    затем сведения об ошибке будут переданы обратному вызову on_error, который затем будет вызван.

  • Если обратный вызов on_error не передается во время создания экземпляра клиента,

    тогда ошибка будет возникать по умолчанию.

Если buffered_mode имеет значение True, требуется обратный вызов on_error , а ошибки будут обрабатываться следующим образом:

  • Если события не удается поставить в очередь в течение заданного времени ожидания, будет напрямую вызвана ошибка.

  • Если после успешного постановки в очередь события не удается отправить, будет вызван обратный вызов on_error .

max_buffer_length
int

Только в буферизованном режиме. Общее количество событий на секцию, которые можно буферизать до запуска очистки. Значение по умолчанию — 1500 в режиме буферизации.

max_wait_time
Optional[float]

Только в буферизованном режиме. Время ожидания сборки пакета с событиями в буфере перед публикацией. Значение по умолчанию — 1 в режиме буферизации.

logging_enable
bool

Указывает, следует ли выводить журналы трассировки сети в средство ведения журнала. Значение по умолчанию — False.

http_proxy
dict

Параметры прокси-сервера HTTP. Это должен быть словарь со следующими ключами: "proxy_hostname" (значение str) и "proxy_port" (значение int). Кроме того, могут присутствовать следующие ключи: "имя пользователя", "пароль".

auth_timeout
float

Время в секундах ожидания авторизации маркера службой. Значение по умолчанию — 60 секунд. Если задано значение 0, время ожидания от клиента не будет применяться.

user_agent
str

Если этот параметр указан, он будет добавлен перед строкой агента пользователя.

retry_total
int

Общее число попыток повторного выполнения неудачной операции при возникновении ошибки. Значение по умолчанию — 3.

retry_backoff_factor
float

Коэффициент задержки, применяемый между попытками после второй попытки (большинство ошибок устраняются немедленно второй попыткой без задержки). В фиксированном режиме политика повторных попыток всегда будет переходить в спящий режим для {backoff factor}. В экспоненциальном режиме политика повторных попыток будет переходить в спящий режим в течение: {коэффициент задержки} * (2 ** ({общее число повторных попыток} — 1)) секунд. Если backoff_factor равно 0,1, то повторная попытка будет в спящем режиме [0,0s, 0,2s, 0,4s, ...] между повторными попытками. Значение по умолчанию — 0,8.

retry_backoff_max
float

Максимальное время отката. Значение по умолчанию — 120 секунд (2 минуты).

retry_mode
str

Поведение задержки между повторными попытками. Поддерживаемые значения: fixed или exponential, где значение по умолчанию — "экспоненциальная".

idle_timeout
float

Время ожидания в секундах, по истечении которого клиент закроет базовое подключение, если действия отсутствуют. По умолчанию значение равно None. Это означает, что клиент не будет завершать работу из-за бездействия, если не будет инициирована службой.

transport_type
TransportType

Тип транспортного протокола, который будет использоваться для взаимодействия со службой Центров событий. Значение по умолчанию — TransportType.Amqp , в котором используется порт 5671. Если порт 5671 недоступен или заблокирован в сетевой среде, можно использовать TransportType.AmqpOverWebsocket , который использует порт 443 для связи.

custom_endpoint_address
Optional[str]

Адрес пользовательской конечной точки, используемый для установления подключения к службе Центров событий, что позволяет маршрутизировать сетевые запросы через любые шлюзы приложений или другие пути, необходимые для среды узла. Значение по умолчанию — Нет. Формат будет следующим: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Если порт не указан в custom_endpoint_address, по умолчанию будет использоваться порт 443.

connection_verify
Optional[str]

Путь к пользовательскому CA_BUNDLE файлу SSL-сертификата, который используется для проверки подлинности удостоверения конечной точки подключения. Значение по умолчанию — Нет, в этом случае будет использоваться certifi.where( ).

uamqp_transport
bool

Следует ли использовать библиотеку uamqp в качестве базового транспорта. Значение по умолчанию — False, а библиотека AMQP Pure Python будет использоваться в качестве базового транспорта.

Возвращаемый тип

Исключения

Если при очистке буфера произошла ошибка, если для свойства очистки задано значение True или при закрытии базовых подключений AMQP в режиме буферизации.

Примеры

Создайте экземпляр EventHubProducerClient из строка подключения.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

Количество событий, которые буферикуются и ожидают публикации для определенной секции. Возвращает значение None в режиме без буферизации. ПРИМЕЧАНИЕ. Буфер событий обрабатывается в фоновой сопрограмме, поэтому количество событий в буфере, сообщаемое этим API, должно рассматриваться только приблизительно и рекомендуется только для использования в отладке. Для идентификатора секции без буферизации событий будет возвращено значение 0 независимо от того, существует ли этот идентификатор секции в концентраторе событий.

get_buffered_event_count(partition_id: str) -> int | None

Параметры

partition_id
str
Обязательно

Идентификатор целевой секции.

Возвращаемый тип

int,

Исключения

Если при очистке буфера произошла ошибка, если для свойства очистки задано значение True или при закрытии базовых подключений AMQP в режиме буферизации.

get_eventhub_properties

Получение свойств концентратора событий.

Ключи в возвращенном словаре:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Возвращаемое значение

Словарь, содержащий сведения о концентраторе событий.

Возвращаемый тип

Исключения

get_partition_ids

Получение идентификаторов секций концентратора событий.

async get_partition_ids() -> List[str]

Возвращаемое значение

Список идентификаторов секций.

Возвращаемый тип

Исключения

get_partition_properties

Получение свойств указанной секции.

В словаре свойств содержатся следующие ключи:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Параметры

partition_id
str
Обязательно

Идентификатор целевой секции.

Возвращаемое значение

Диктовка свойств секции.

Возвращаемый тип

Исключения

send_batch

Отправляет пакет данных о событиях. По умолчанию метод блокируется до получения подтверждения или истечения времени ожидания операции. Если EventHubProducerClient настроен для запуска в буферизованном режиме, метод помещает события в очередь в локальный буфер и возвращает их. Производитель выполняет автоматическую отправку в фоновом режиме.

Если buffered_mode имеет значение False, on_error обратный вызов необязателен, а ошибки будут обрабатываться следующим образом:

  • Если обратный вызов on_error передается во время создания экземпляра клиента-производителя,

    затем сведения об ошибке будут переданы обратному вызову on_error, который затем будет вызван.

  • Если обратный вызов on_error не передается во время создания экземпляра клиента,

    тогда ошибка будет возникать по умолчанию.

Если buffered_mode имеет значение True, требуется обратный вызов on_error , а ошибки будут обрабатываться следующим образом:

  • Если события не удается поставить в очередь в течение заданного времени ожидания, будет напрямую вызвана ошибка.

  • Если после успешного постановки в очередь события не удается отправить, будет вызван обратный вызов on_error .

В режиме буферизации отправка пакета останется нетронутой и отправляется как единое целое. Пакет не будет переупорядочен. Это может привести к неэффективности отправки событий.

Если вы отправляете конечный список EventData или AmqpAnnotatedMessage и знаете, что он находится в пределах предельного размера кадра концентратора событий, вы можете отправить их с помощью вызова send_batch . В противном случае используйте create_batch для создания EventDataBatch и добавьте EventData или AmqpAnnotatedMessage в пакет по одному до ограничения размера, а затем вызовите этот метод для отправки пакета.

async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Параметры

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Обязательно

Отправляемый объект EventDataBatch или список данных EventData для отправки в пакете. Все EventData или AmqpAnnotatedMessage в списке или EventDataBatch будут помещаться в один раздел.

timeout
float

Максимальное время ожидания для отправки данных о событиях в режиме без буферизации или максимальное время ожидания для постановки данных события в буфер в режиме буферизации. В режиме без буферизации будет использоваться время ожидания по умолчанию, указанное при создании производителя. В режиме буферизации время ожидания по умолчанию — Нет.

partition_id
str

Конкретный идентификатор секции для отправки. Значение по умолчанию — Нет. В этом случае служба будет назначаться всем секциям с помощью циклического перебора. TypeError будет вызываться, если указан partition_id и event_data_batch является EventDataBatch, так как сам EventDataBatch имеет partition_id.

partition_key
str

При заданном partition_key данные о событиях будут отправляться в определенную секцию концентратора событий, определяемую службой. TypeError возникает, если указан partition_key и event_data_batch является EventDataBatch, так как сам EventDataBatch имеет partition_key. Если указаны partition_id и partition_key, приоритет будет иметь partition_id. ПРЕДУПРЕЖДЕНИЕ. Не рекомендуется задавать partition_key нестроковых значений для отправляемых событий, так как служба концентратора событий игнорирует partition_key, а события будут назначены всем секциям с помощью циклического перебора. Кроме того, существуют пакеты SDK для использования событий, которые ожидают, что partition_key будет только строковым типом. Они могут не проанализировать нестроковое значение.

Возвращаемый тип

Исключения

Если значение, указанное параметром времени ожидания, истекает до того, как событие может быть отправлено в режиме без буферизации, или события могут быть помещены в очередь в буферизованном режиме.

Примеры

Асинхронно отправляет данные о событиях


       async with producer:
           event_data_batch = await producer.create_batch()
           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # The EventDataBatch object reaches its max_size.
                   # You can send the full EventDataBatch object and create a new one here.
                   break
           await producer.send_batch(event_data_batch)

send_event

Отправляет данные события. По умолчанию метод блокируется до получения подтверждения или истечения времени ожидания операции. Если EventHubProducerClient настроен для запуска в буферизованном режиме, метод помещает событие в очередь в локальный буфер и возвращает его. Производитель будет выполнять автоматическую пакетную обработку и отправку в фоновом режиме.

Если buffered_mode имеет значение False, on_error обратный вызов необязателен и ошибки будут обрабатываться следующим образом: * Если обратный вызов on_error передается во время создания экземпляра клиента-производителя,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Если buffered_mode имеет значение True, требуется обратный вызов on_error , а ошибки будут обрабатываться следующим образом: * Если события не могут поставить в очередь в течение заданного времени ожидания, то будет напрямую вызвана ошибка.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Параметры

event_data
Union[EventData, AmqpAnnotatedMessage]
Обязательно

Отправляемый объект EventData .

timeout
float

Максимальное время ожидания для отправки данных о событиях в режиме без буферизации или максимальное время ожидания для постановки данных события в буфер в режиме буферизации. В режиме без буферизации будет использоваться время ожидания по умолчанию, указанное при создании производителя. В режиме буферизации время ожидания по умолчанию — Нет.

partition_id
str

Конкретный идентификатор секции для отправки. Значение по умолчанию — Нет. В этом случае служба будет назначаться всем секциям с помощью циклического перебора. TypeError будет вызываться, если указан partition_id и event_data_batch является EventDataBatch, так как сам EventDataBatch имеет partition_id.

partition_key
str

При заданном partition_key данные о событиях будут отправляться в определенную секцию концентратора событий, определяемую службой. TypeError возникает, если указан partition_key и event_data_batch является EventDataBatch, так как сам EventDataBatch имеет partition_key. Если указаны partition_id и partition_key, приоритет будет иметь partition_id. ПРЕДУПРЕЖДЕНИЕ. Не рекомендуется задавать partition_key нестроковых значений для отправляемых событий, так как служба концентратора событий игнорирует partition_key, а события будут назначены всем секциям с помощью циклического перебора. Кроме того, существуют пакеты SDK для использования событий, которые ожидают, что partition_key будет только строковым типом. Они могут не проанализировать нестроковое значение.

Возвращаемый тип

Исключения

Если значение, заданное параметром времени ожидания, истекает до того, как событие может быть отправлено в режиме без буферизации, или события не могут быть помещены в очередь в буферизованном режиме.

Атрибуты

total_buffered_event_count

Общее количество событий, которые в данный момент буферичены и ожидают публикации, во всех секциях. Возвращает значение None в режиме без буферизации. ПРИМЕЧАНИЕ. Буфер событий обрабатывается в фоновой сопрограмме, поэтому количество событий в буфере, сообщаемое этим API, должно рассматриваться только приблизительно и рекомендуется только для использования в отладке.

Возвращаемый тип

int,