EventHubProducerClient Clase

La clase EventHubProducerClient define una interfaz de alto nivel para enviar eventos al servicio Azure Event Hubs.

Herencia
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubProducerClient

Constructor

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)

Parámetros

fully_qualified_namespace
str
Requerido

Nombre de host completo para el espacio de nombres de Event Hubs. Es probable que sea similar a .servicebus.windows.net

eventhub_name
str
Requerido

Ruta de acceso del centro de eventos específico al que se va a conectar el cliente.

credential
AsyncTokenCredential o AzureSasCredential o AzureNamedKeyCredential
Requerido

Objeto de credencial usado para la autenticación que implementa una interfaz determinada para obtener tokens. Acepta objetos de credenciales EventHubSharedKeyCredentialo generados por la biblioteca azure-identity y los objetos que implementan el método *get_token(self, scopes).

buffered_mode
bool

Si es True, el cliente productor recopilará eventos en un búfer, lote de forma eficaz y, a continuación, publicará. El valor predeterminado es False.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Devolución de llamada a la que se llamará una vez que se haya publicado correctamente un lote. La devolución de llamada toma dos parámetros:

  • events: la lista de eventos que se han publicado correctamente.

  • partition_id: identificador de partición en el que se han publicado los eventos de la lista.

La función de devolución de llamada debe definirse como: on_success(events, partition_id). Se requiere cuando buffered_mode es True mientras que opcional si buffered_mode es False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

La devolución de llamada a la que se llamará una vez que no se haya publicado un lote. Obligatorio cuando en buffered_mode es True mientras que opcional si buffered_mode es False. La función de devolución de llamada debe definirse como: on_error(events, partition_id, error), donde:

  • events: la lista de eventos que no se pudieron publicar,

  • partition_id: identificador de partición en el que se han intentado publicar los eventos de la lista y

  • error: excepción relacionada con el error de envío.

Si buffered_mode es False, on_error devolución de llamada es opcional y los errores se controlarán de la siguiente manera:

  • Si se pasa una devolución de llamada on_error durante la creación de instancias del cliente productor,

    a continuación, se pasará información de error al on_error devolución de llamada, a la que se llamará.

  • Si no se pasa una devolución de llamada on_error durante la creación de instancias del cliente,

    a continuación, el error se generará de forma predeterminada.

Si buffered_mode es True, se requiere on_error devolución de llamada y los errores se controlarán de la siguiente manera:

  • Si los eventos no se ponen en cola dentro del tiempo de espera especificado, se generará un error directamente.

  • Si los eventos no se envían después de poner en cola correctamente, se llamará a la devolución de llamada on_error .

max_buffer_length
int

Solo modo almacenado en búfer. Número total de eventos por partición que se pueden almacenar en búfer antes de que se desencadene un vaciado. El valor predeterminado es 1500 en modo almacenado en búfer.

max_wait_time
Optional[float]

Solo modo almacenado en búfer. Cantidad de tiempo que se espera a que se cree un lote con eventos en el búfer antes de publicarlo. El valor predeterminado es 1 en modo almacenado en búfer.

logging_enable
bool

Indica si se van a generar registros de seguimiento de red en el registrador. El valor predeterminado es false.

auth_timeout
float

Tiempo en segundos para esperar a que el servicio autorice un token. El valor predeterminado es de 60 segundos. Si se establece en 0, no se aplicará ningún tiempo de espera desde el cliente.

user_agent
str

Si se especifica, se agregará delante de la cadena del agente de usuario.

retry_total
int

Número total de intentos de rehacer una operación con error cuando se produce un error. El valor predeterminado es 3.

retry_backoff_factor
float

Un factor de retroceso que se va a aplicar entre los intentos después del segundo intento (la mayoría de los errores se resuelven inmediatamente mediante un segundo intento sin retraso). En modo fijo, la directiva de reintento siempre se suspenderá para {factor de retroceso}. En el modo "exponencial", la directiva de reintento se suspenderá durante: {factor de retroceso} * (2 ** ({número de reintentos totales} - 1)) segundos. Si el backoff_factor es 0,1, el reintento se suspenderá para [0.0s, 0.2s, 0.4s, ...] entre reintentos. El valor predeterminado es 0,8.

retry_backoff_max
float

Tiempo de retroceso máximo. El valor predeterminado es 120 segundos (2 minutos).

retry_mode
str

Comportamiento de retraso entre reintentos. Los valores admitidos son "fijo" o "exponencial", donde el valor predeterminado es "exponencial".

idle_timeout
float

Tiempo de espera, en segundos, después del cual este cliente cerrará la conexión subyacente si no hay ninguna actividad. De forma predeterminada, el valor es None, lo que significa que el cliente no se apagará debido a la inactividad a menos que el servicio lo inicie.

transport_type
TransportType

Tipo de protocolo de transporte que se usará para comunicarse con el servicio Event Hubs. El valor predeterminado es TransportType.Amqp en cuyo caso se usa el puerto 5671. Si el puerto 5671 no está disponible o bloqueado en el entorno de red, TransportType.AmqpOverWebsocket podría usarse en su lugar, que usa el puerto 443 para la comunicación.

http_proxy
dict

Configuración del proxy HTTP. Debe ser un diccionario con las siguientes claves: "proxy_hostname" (valor str) y "proxy_port" (valor int). Además, las claves siguientes también pueden estar presentes: "username", "password".

custom_endpoint_address
Optional[str]

La dirección del punto de conexión personalizado que se va a usar para establecer una conexión con el servicio Event Hubs, lo que permite enrutar las solicitudes de red a través de las puertas de enlace de aplicaciones u otras rutas de acceso necesarias para el entorno host. El valor predeterminado es None. El formato sería "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Si no se especifica el puerto en el custom_endpoint_address, se usará el puerto 443 de forma predeterminada.

connection_verify
Optional[str]

Ruta de acceso al archivo de CA_BUNDLE personalizado del certificado SSL que se usa para autenticar la identidad del punto de conexión. El valor predeterminado es None en cuyo caso se usará certifi.where().

uamqp_transport
bool

Si se debe usar la biblioteca uamqp como transporte subyacente. El valor predeterminado es False y la biblioteca AMQP de Python pura se usará como transporte subyacente.

socket_timeout
float

Tiempo en segundos que el socket subyacente de la conexión debe esperar al enviar y recibir datos antes de que se agote el tiempo de espera. El valor predeterminado es 0.2 para TransportType.Amqp y 1 para TransportType.AmqpOverWebsocket. Si se producen errores de EventHubsConnectionError debido al tiempo de espera de escritura, es posible que sea necesario pasar un valor mayor que el predeterminado. Esto es para escenarios de uso avanzado y normalmente el valor predeterminado debe ser suficiente.

Ejemplos

Cree una nueva instancia de EventHubProducerClient.


   import os
   from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Métodos

close

Cierre la conexión y los vínculos de AMQP subyacentes del cliente productor.

create_batch

Cree un objeto EventDataBatch con el tamaño máximo de todo el contenido restringido por max_size_in_bytes.

El max_size_in_bytes no debe ser mayor que el tamaño máximo permitido del mensaje definido por el servicio.

flush

Solo modo almacenado en búfer. Vacíe los eventos en el búfer que se enviarán inmediatamente si el cliente está trabajando en modo almacenado en búfer.

from_connection_string

Cree un eventHubProducerClient a partir de un cadena de conexión.

get_buffered_event_count

Número de eventos almacenados en búfer y en espera de publicarse para una partición determinada. Devuelve None en modo no almacenado en búfer. NOTA: El búfer de eventos se procesa en una corrutina en segundo plano, por lo que el número de eventos del búfer notificado por esta API debe considerarse solo una aproximación y solo se recomienda para su uso en la depuración. Para un identificador de partición que no tiene ningún evento almacenado en búfer, se devolverá 0 independientemente de si ese identificador de partición existe realmente dentro del centro de eventos.

get_eventhub_properties

Obtiene las propiedades del centro de eventos.

Las claves del diccionario devuelto incluyen:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Obtiene los identificadores de partición del centro de eventos.

get_partition_properties

Obtiene las propiedades de la partición especificada.

Las claves del diccionario de propiedades incluyen:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

Envía un lote de datos de eventos. De forma predeterminada, el método se bloqueará hasta que se reciba la confirmación o se agote el tiempo de espera de la operación. Si EventHubProducerClient está configurado para ejecutarse en modo almacenado en búfer, el método pondrá en cola los eventos en el búfer local y devolverá. El productor realizará el envío automático en segundo plano.

Si buffered_mode es False, on_error devolución de llamada es opcional y los errores se controlarán de la siguiente manera:

  • Si se pasa una devolución de llamada on_error durante la creación de instancias del cliente productor,

    a continuación, se pasará información de error al on_error devolución de llamada, a la que se llamará.

  • Si no se pasa una devolución de llamada on_error durante la creación de instancias del cliente,

    a continuación, el error se generará de forma predeterminada.

Si buffered_mode es True, se requiere on_error devolución de llamada y los errores se controlarán de la siguiente manera:

  • Si los eventos no se ponen en cola dentro del tiempo de espera especificado, se generará un error directamente.

  • Si los eventos no se envían después de poner en cola correctamente, se llamará a la devolución de llamada on_error .

En el modo almacenado en búfer, el envío de un lote permanecerá intacto y se enviará como una sola unidad. El lote no se reorganizará. Esto puede dar lugar a una ineficiencia en el envío de eventos.

Si va a enviar una lista finita de EventData o AmqpAnnotatedMessage y sabe que está dentro del límite de tamaño de fotogramas del centro de eventos, puede enviarlos con una llamada send_batch . De lo contrario, use create_batch para crear EventDataBatch y agregue EventData o AmqpAnnotatedMessage al lote uno por uno hasta el límite de tamaño y, a continuación, llame a este método para enviar el lote.

send_event

Envía datos de eventos. De forma predeterminada, el método se bloqueará hasta que se reciba la confirmación o se agote el tiempo de espera de la operación. Si EventHubProducerClient está configurado para ejecutarse en modo almacenado en búfer, el método pondrá en cola el evento en el búfer local y devolverá. El productor realizará el procesamiento por lotes automático y el envío en segundo plano.

Si buffered_mode es False, on_error devolución de llamada es opcional y los errores se controlarán de la siguiente manera: * Si se pasa una devolución de llamada on_error durante la creación de instancias del cliente productor,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Si buffered_mode es True, se requiere on_error devolución de llamada y los errores se controlarán de la siguiente manera: * Si los eventos no se ponen en cola dentro del tiempo de espera especificado, se generará un error directamente.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Cierre la conexión y los vínculos de AMQP subyacentes del cliente productor.

async close(*, flush: bool = True, **kwargs: Any) -> None

Parámetros

flush
bool

Solo modo almacenado en búfer. Si se establece en True, los eventos del búfer se enviarán inmediatamente. El valor predeterminado es True.

timeout
float o None

Solo modo almacenado en búfer. Tiempo de espera para cerrar el productor. El valor predeterminado es Ninguno, lo que significa que no hay tiempo de espera.

Tipo de valor devuelto

Excepciones

Si se produce un error al vaciar el búfer si el vaciado está establecido en True o al cerrar las conexiones AMQP subyacentes en modo almacenado en búfer.

Ejemplos

Cierre el controlador.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   from azure.eventhub import EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break
       await producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       await producer.close()

create_batch

Cree un objeto EventDataBatch con el tamaño máximo de todo el contenido restringido por max_size_in_bytes.

El max_size_in_bytes no debe ser mayor que el tamaño máximo permitido del mensaje definido por el servicio.

async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch

Tipo de valor devuelto

Excepciones

Si se produce un error al vaciar el búfer si el vaciado está establecido en True o al cerrar las conexiones AMQP subyacentes en modo almacenado en búfer.

Ejemplos

Creación de un objeto EventDataBatch con un tamaño limitado


       from azure.eventhub import EventData
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Solo modo almacenado en búfer. Vacíe los eventos en el búfer que se enviarán inmediatamente si el cliente está trabajando en modo almacenado en búfer.

async flush(**kwargs: Any) -> None

Parámetros

timeout
float o None

Tiempo de espera para vaciar los eventos almacenados en búfer, el valor predeterminado es Ninguno, lo que significa que no se ha agotado el tiempo de espera.

Tipo de valor devuelto

Excepciones

Si el productor no puede vaciar el búfer dentro del tiempo de espera especificado en modo almacenado en búfer.

from_connection_string

Cree un eventHubProducerClient a partir de un cadena de conexión.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient

Parámetros

conn_str
str
Requerido

El cadena de conexión de un centro de eventos.

eventhub_name
str

Ruta de acceso del centro de eventos específico al que se va a conectar el cliente.

buffered_mode
bool

Si es True, el cliente productor recopilará eventos en un búfer, lote de forma eficaz y, a continuación, publicará. El valor predeterminado es False.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Devolución de llamada a la que se llamará una vez que se haya publicado correctamente un lote. La devolución de llamada toma dos parámetros:

  • events: la lista de eventos que se han publicado correctamente.

  • partition_id: identificador de partición en el que se han publicado los eventos de la lista.

La función de devolución de llamada debe definirse como: on_success(events, partition_id). Se requiere cuando buffered_mode es True mientras que opcional si buffered_mode es False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

La devolución de llamada a la que se llamará una vez que no se haya publicado un lote. La función de devolución de llamada debe definirse como: on_error(events, partition_id, error), donde:

  • events: la lista de eventos que no se pudieron publicar,

  • partition_id: identificador de partición en el que se han intentado publicar los eventos de la lista y

  • error: excepción relacionada con el error de envío.

Si buffered_mode es False, on_error devolución de llamada es opcional y los errores se controlarán de la siguiente manera:

  • Si se pasa una devolución de llamada on_error durante la creación de instancias del cliente productor,

    a continuación, se pasará información de error al on_error devolución de llamada, a la que se llamará.

  • Si no se pasa una devolución de llamada on_error durante la creación de instancias del cliente,

    a continuación, el error se generará de forma predeterminada.

Si buffered_mode es True, se requiere on_error devolución de llamada y los errores se controlarán de la siguiente manera:

  • Si los eventos no se ponen en cola dentro del tiempo de espera especificado, se generará un error directamente.

  • Si los eventos no se envían después de poner en cola correctamente, se llamará a la devolución de llamada on_error .

max_buffer_length
int

Solo modo almacenado en búfer. Número total de eventos por partición que se pueden almacenar en búfer antes de que se desencadene un vaciado. El valor predeterminado es 1500 en modo almacenado en búfer.

max_wait_time
Optional[float]

Solo modo almacenado en búfer. Cantidad de tiempo que se espera a que se cree un lote con eventos en el búfer antes de publicarlo. El valor predeterminado es 1 en modo almacenado en búfer.

logging_enable
bool

Indica si se van a generar registros de seguimiento de red en el registrador. El valor predeterminado es false.

http_proxy
dict

Configuración del proxy HTTP. Debe ser un diccionario con las siguientes claves: "proxy_hostname" (valor str) y "proxy_port" (valor int). Además, las claves siguientes también pueden estar presentes: "username", "password".

auth_timeout
float

Tiempo en segundos para esperar a que el servicio autorice un token. El valor predeterminado es de 60 segundos. Si se establece en 0, no se aplicará ningún tiempo de espera desde el cliente.

user_agent
str

Si se especifica, se agregará delante de la cadena del agente de usuario.

retry_total
int

Número total de intentos de rehacer una operación con error cuando se produce un error. El valor predeterminado es 3.

retry_backoff_factor
float

Un factor de retroceso que se va a aplicar entre los intentos después del segundo intento (la mayoría de los errores se resuelven inmediatamente mediante un segundo intento sin retraso). En modo fijo, la directiva de reintento siempre se suspenderá para {factor de retroceso}. En el modo "exponencial", la directiva de reintento se suspenderá durante: {factor de retroceso} * (2 ** ({número de reintentos totales} - 1)) segundos. Si el backoff_factor es 0,1, el reintento se suspenderá para [0.0s, 0.2s, 0.4s, ...] entre reintentos. El valor predeterminado es 0,8.

retry_backoff_max
float

Tiempo de retroceso máximo. El valor predeterminado es 120 segundos (2 minutos).

retry_mode
str

Comportamiento de retraso entre reintentos. Los valores admitidos son "fijo" o "exponencial", donde el valor predeterminado es "exponencial".

idle_timeout
float

Tiempo de espera, en segundos, después del cual este cliente cerrará la conexión subyacente si no hay ninguna actividad. De forma predeterminada, el valor es None, lo que significa que el cliente no se apagará debido a la inactividad a menos que el servicio lo inicie.

transport_type
TransportType

Tipo de protocolo de transporte que se usará para comunicarse con el servicio Event Hubs. El valor predeterminado es TransportType.Amqp en cuyo caso se usa el puerto 5671. Si el puerto 5671 no está disponible o bloqueado en el entorno de red, TransportType.AmqpOverWebsocket podría usarse en su lugar, que usa el puerto 443 para la comunicación.

custom_endpoint_address
Optional[str]

La dirección del punto de conexión personalizado que se va a usar para establecer una conexión con el servicio Event Hubs, lo que permite enrutar las solicitudes de red a través de las puertas de enlace de aplicaciones u otras rutas de acceso necesarias para el entorno host. El valor predeterminado es None. El formato sería "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Si no se especifica el puerto en el custom_endpoint_address, se usará el puerto 443 de forma predeterminada.

connection_verify
Optional[str]

Ruta de acceso al archivo de CA_BUNDLE personalizado del certificado SSL que se usa para autenticar la identidad del punto de conexión. El valor predeterminado es None en cuyo caso se usará certifi.where().

uamqp_transport
bool

Si se debe usar la biblioteca uamqp como transporte subyacente. El valor predeterminado es False y la biblioteca AMQP de Python pura se usará como transporte subyacente.

Tipo de valor devuelto

Excepciones

Si se produce un error al vaciar el búfer si el vaciado está establecido en True o al cerrar las conexiones AMQP subyacentes en modo almacenado en búfer.

Ejemplos

Cree una nueva instancia de EventHubProducerClient a partir de cadena de conexión.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

Número de eventos almacenados en búfer y en espera de publicarse para una partición determinada. Devuelve None en modo no almacenado en búfer. NOTA: El búfer de eventos se procesa en una corrutina en segundo plano, por lo que el número de eventos del búfer notificado por esta API debe considerarse solo una aproximación y solo se recomienda para su uso en la depuración. Para un identificador de partición que no tiene ningún evento almacenado en búfer, se devolverá 0 independientemente de si ese identificador de partición existe realmente dentro del centro de eventos.

get_buffered_event_count(partition_id: str) -> int | None

Parámetros

partition_id
str
Requerido

Identificador de partición de destino.

Tipo de valor devuelto

int,

Excepciones

Si se produce un error al vaciar el búfer si el vaciado está establecido en True o al cerrar las conexiones AMQP subyacentes en modo almacenado en búfer.

get_eventhub_properties

Obtiene las propiedades del centro de eventos.

Las claves del diccionario devuelto incluyen:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Devoluciones

Diccionario que contiene información sobre el centro de eventos.

Tipo de valor devuelto

Excepciones

get_partition_ids

Obtiene los identificadores de partición del centro de eventos.

async get_partition_ids() -> List[str]

Devoluciones

Lista de identificadores de partición.

Tipo de valor devuelto

Excepciones

get_partition_properties

Obtiene las propiedades de la partición especificada.

Las claves del diccionario de propiedades incluyen:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parámetros

partition_id
str
Requerido

Identificador de partición de destino.

Devoluciones

Una diferencia de las propiedades de partición.

Tipo de valor devuelto

Excepciones

send_batch

Envía un lote de datos de eventos. De forma predeterminada, el método se bloqueará hasta que se reciba la confirmación o se agote el tiempo de espera de la operación. Si EventHubProducerClient está configurado para ejecutarse en modo almacenado en búfer, el método pondrá en cola los eventos en el búfer local y devolverá. El productor realizará el envío automático en segundo plano.

Si buffered_mode es False, on_error devolución de llamada es opcional y los errores se controlarán de la siguiente manera:

  • Si se pasa una devolución de llamada on_error durante la creación de instancias del cliente productor,

    a continuación, se pasará información de error al on_error devolución de llamada, a la que se llamará.

  • Si no se pasa una devolución de llamada on_error durante la creación de instancias del cliente,

    a continuación, el error se generará de forma predeterminada.

Si buffered_mode es True, se requiere on_error devolución de llamada y los errores se controlarán de la siguiente manera:

  • Si los eventos no se ponen en cola dentro del tiempo de espera especificado, se generará un error directamente.

  • Si los eventos no se envían después de poner en cola correctamente, se llamará a la devolución de llamada on_error .

En el modo almacenado en búfer, el envío de un lote permanecerá intacto y se enviará como una sola unidad. El lote no se reorganizará. Esto puede dar lugar a una ineficiencia en el envío de eventos.

Si va a enviar una lista finita de EventData o AmqpAnnotatedMessage y sabe que está dentro del límite de tamaño de fotogramas del centro de eventos, puede enviarlos con una llamada send_batch . De lo contrario, use create_batch para crear EventDataBatch y agregue EventData o AmqpAnnotatedMessage al lote uno por uno hasta el límite de tamaño y, a continuación, llame a este método para enviar el lote.

async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Parámetros

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Requerido

Objeto EventDataBatch que se va a enviar o una lista de EventData que se enviarán en un lote. Todos los eventos EventData o AmqpAnnotatedMessage de la lista o EventDataBatch se colocarán en la misma partición.

timeout
float

El tiempo de espera máximo para enviar los datos del evento en modo no almacenado en búfer o el tiempo de espera máximo para poner en cola los datos del evento en el búfer en modo almacenado en búfer. En modo no almacenado en búfer, se usará el tiempo de espera predeterminado especificado cuando se creó el productor. En el modo almacenado en búfer, el tiempo de espera predeterminado es Ninguno.

partition_id
str

Identificador de partición específico al que se va a enviar. El valor predeterminado es None, en cuyo caso el servicio se asignará a todas las particiones mediante round robin. Se generará un TypeError si se especifica partition_id y event_data_batch es eventDataBatch porque el propio EventDataBatch tiene partition_id.

partition_key
str

Con el partition_key especificado, los datos de eventos se enviarán a una partición determinada del centro de eventos decidido por el servicio. Se generará un TypeError si se especifica partition_key y event_data_batch es eventDataBatch porque el propio EventDataBatch tiene partition_key. Si se proporcionan partition_id y partition_key, el partition_id tendrá prioridad. ADVERTENCIA: No se recomienda establecer partition_key de valor que no sea de cadena en los eventos que se van a enviar, ya que el servicio del centro de eventos omitirá el partition_key y los eventos se asignarán a todas las particiones mediante round robin. Además, hay SDK para consumir eventos que esperan que partition_key solo sean de tipo de cadena, es posible que no puedan analizar el valor que no es de cadena.

Tipo de valor devuelto

Excepciones

Si el valor especificado por el parámetro de tiempo de espera transcurre antes de que el evento se pueda enviar en modo no almacenado en búfer o los eventos se pueden poner en cola en el almacenado en búfer en modo almacenado en búfer.

Ejemplos

Envía datos de eventos de forma asincrónica


       async with producer:
           event_data_batch = await producer.create_batch()
           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # The EventDataBatch object reaches its max_size.
                   # You can send the full EventDataBatch object and create a new one here.
                   break
           await producer.send_batch(event_data_batch)

send_event

Envía datos de eventos. De forma predeterminada, el método se bloqueará hasta que se reciba la confirmación o se agote el tiempo de espera de la operación. Si EventHubProducerClient está configurado para ejecutarse en modo almacenado en búfer, el método pondrá en cola el evento en el búfer local y devolverá. El productor realizará el procesamiento por lotes automático y el envío en segundo plano.

Si buffered_mode es False, on_error devolución de llamada es opcional y los errores se controlarán de la siguiente manera: * Si se pasa una devolución de llamada on_error durante la creación de instancias del cliente productor,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Si buffered_mode es True, se requiere on_error devolución de llamada y los errores se controlarán de la siguiente manera: * Si los eventos no se ponen en cola dentro del tiempo de espera especificado, se generará un error directamente.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Parámetros

event_data
Union[EventData, AmqpAnnotatedMessage]
Requerido

Objeto EventData que se va a enviar.

timeout
float

El tiempo de espera máximo para enviar los datos del evento en modo no almacenado en búfer o el tiempo de espera máximo para poner en cola los datos del evento en el búfer en modo almacenado en búfer. En modo no almacenado en búfer, se usará el tiempo de espera predeterminado especificado cuando se creó el productor. En el modo almacenado en búfer, el tiempo de espera predeterminado es Ninguno.

partition_id
str

Identificador de partición específico al que se va a enviar. El valor predeterminado es None, en cuyo caso el servicio se asignará a todas las particiones mediante round robin. Se generará un TypeError si se especifica partition_id y event_data_batch es eventDataBatch porque el propio EventDataBatch tiene partition_id.

partition_key
str

Con el partition_key especificado, los datos de eventos se enviarán a una partición determinada del centro de eventos decidido por el servicio. Se generará un TypeError si se especifica partition_key y event_data_batch es eventDataBatch porque el propio EventDataBatch tiene partition_key. Si se proporcionan partition_id y partition_key, el partition_id tendrá prioridad. ADVERTENCIA: No se recomienda establecer partition_key de valor que no sea de cadena en los eventos que se van a enviar, ya que el servicio del centro de eventos omitirá el partition_key y los eventos se asignarán a todas las particiones mediante round robin. Además, hay SDK para consumir eventos que esperan que partition_key solo sean de tipo de cadena, es posible que no puedan analizar el valor que no es de cadena.

Tipo de valor devuelto

Excepciones

Si el valor especificado por el parámetro de tiempo de espera transcurre antes de que se pueda enviar el evento en modo no almacenado en búfer o los eventos no se pueden poner en cola en el almacenado en búfer en modo almacenado en búfer.

Atributos

total_buffered_event_count

Número total de eventos almacenados en búfer y en espera de publicación, en todas las particiones. Devuelve None en modo no almacenado en búfer. NOTA: El búfer de eventos se procesa en una corrutina en segundo plano, por lo que el número de eventos del búfer notificado por esta API debe considerarse solo una aproximación y solo se recomienda para su uso en la depuración.

Tipo de valor devuelto

int,