Share via


EventHubProducerClient Classe

A classe EventHubProducerClient define uma interface de alto nível para enviar eventos para o serviço Hubs de Eventos do Azure.

Herança
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubProducerClient

Construtor

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)

Parâmetros

fully_qualified_namespace
str
Obrigatório

O nome do host totalmente qualificado para o namespace dos Hubs de Eventos. É provável que isso seja semelhante a .servicebus.windows.net

eventhub_name
str
Obrigatório

O caminho do Hub de Eventos específico ao qual conectar o cliente.

credential
AsyncTokenCredential ou AzureSasCredential ou AzureNamedKeyCredential
Obrigatório

O objeto de credencial usado para autenticação que implementa uma interface específica para obter tokens. Ele aceita EventHubSharedKeyCredential, ou objetos de credencial gerados pela biblioteca azure-identity e objetos que implementam o método *get_token(self, scopes).

buffered_mode
bool

Se True, o cliente produtor coletará eventos em um buffer, em lote com eficiência e publicará. O padrão é Falso.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

O retorno de chamada a ser chamado depois que um lote for publicado com êxito. O retorno de chamada usa dois parâmetros:

  • eventos: a lista de eventos que foram publicados com êxito

  • partition_id: a ID da partição na qual os eventos na lista foram publicados.

A função de retorno de chamada deve ser definida como: on_success(events, partition_id). Obrigatório quando buffered_mode é True enquanto opcional se buffered_mode for False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

O retorno de chamada a ser chamado depois que um lote falha ao ser publicado. Obrigatório quando em buffered_mode for True enquanto opcional se buffered_mode for False. A função de retorno de chamada deve ser definida como: on_error(events, partition_id, error), em que:

  • eventos: a lista de eventos que não foram publicados,

  • partition_id: a ID da partição na qual os eventos na lista foram tentados para serem publicados em e

  • error: a exceção relacionada à falha de envio.

Se buffered_mode for False, on_error retorno de chamada será opcional e os erros serão tratados da seguinte maneira:

  • Se um retorno de chamada on_error for passado durante a instanciação do cliente produtor,

    em seguida, as informações de erro serão passadas para o on_error retorno de chamada, que será chamado.

  • Se um retorno de chamada on_error não for passado durante a instanciação do cliente,

    em seguida, o erro será gerado por padrão.

Se buffered_mode for True, on_error retorno de chamada será necessário e os erros serão tratados da seguinte maneira:

  • Se os eventos não conseguirem enfileirar dentro do tempo limite determinado, um erro será gerado diretamente.

  • Se os eventos não forem enviados após a enfileiramento com êxito, o retorno de chamada on_error será chamado.

max_buffer_length
int

Somente modo em buffer. O número total de eventos por partição que podem ser armazenados em buffer antes que uma liberação seja disparada. O valor padrão é 1500 no modo em buffer.

max_wait_time
Optional[float]

Somente modo em buffer. O tempo de espera para que um lote seja criado com eventos no buffer antes da publicação. O valor padrão é 1 no modo em buffer.

logging_enable
bool

Se os logs de rastreamento de rede devem ser gerados para o agente. O padrão é False.

auth_timeout
float

O tempo em segundos para aguardar que um token seja autorizado pelo serviço. O valor padrão é 60 segundos. Se definido como 0, nenhum tempo limite será imposto do cliente.

user_agent
str

Se especificado, isso será adicionado na frente da cadeia de caracteres do agente do usuário.

retry_total
int

O número total de tentativas de refazer uma operação com falha quando ocorre um erro. O valor padrão é 3.

retry_backoff_factor
float

Um fator de retirada a ser aplicado entre tentativas após a segunda tentativa (a maioria dos erros é resolvida imediatamente por uma segunda tentativa sem atraso). No modo fixo, a política de repetição sempre será suspensa para {fator de retirada}. No modo 'exponencial', a política de repetição será suspensa para: {fator de retirada} * (2 ** ({número de tentativas totais} - 1)) segundos. Se o backoff_factor for 0,1, a repetição será suspensa para [0.0s, 0.2s, 0.4s, ...] entre as repetições. O valor padrão é 0,8.

retry_backoff_max
float

O tempo máximo de retirada. O valor padrão é 120 segundos (2 minutos).

retry_mode
str

O comportamento de atraso entre tentativas de repetição. Os valores com suporte são 'fixo' ou 'exponencial', em que o padrão é 'exponencial'.

idle_timeout
float

Tempo limite, em segundos, após o qual esse cliente fechará a conexão subjacente se não houver nenhuma atividade. Por padrão, o valor é None, o que significa que o cliente não será desligado devido à inatividade, a menos que seja iniciado pelo serviço.

transport_type
TransportType

O tipo de protocolo de transporte que será usado para se comunicar com o serviço de Hubs de Eventos. O padrão é TransportType.Amqp , caso em que a porta 5671 é usada. Se a porta 5671 não estiver disponível/bloqueada no ambiente de rede, TransportType.AmqpOverWebsocket poderá ser usado, em vez disso, que usa a porta 443 para comunicação.

http_proxy
dict

Configurações de proxy HTTP. Deve ser um dicionário com as seguintes chaves: 'proxy_hostname' (valor str) e 'proxy_port' (valor int). Além disso, as seguintes chaves também podem estar presentes: 'username', 'password'.

custom_endpoint_address
Optional[str]

O endereço do ponto de extremidade personalizado a ser usado para estabelecer uma conexão com o serviço de Hubs de Eventos, permitindo que as solicitações de rede sejam roteadas por meio de quaisquer gateways de aplicativo ou outros caminhos necessários para o ambiente de host. O padrão é None. O formato seria como "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se a porta não for especificada no custom_endpoint_address, por padrão, a porta 443 será usada.

connection_verify
Optional[str]

Caminho para o arquivo de CA_BUNDLE personalizado do certificado SSL que é usado para autenticar a identidade do ponto de extremidade de conexão. O padrão é None, caso em que certifi.where() será usado.

uamqp_transport
bool

Se a biblioteca uamqp deve ser usada como o transporte subjacente. O valor padrão é False e a biblioteca AMQP pura do Python será usada como o transporte subjacente.

socket_timeout
float

O tempo em segundos que o soquete subjacente na conexão deve aguardar ao enviar e receber dados antes de atingir o tempo limite. O valor padrão é 0,2 para TransportType.Amqp e 1 para TransportType.AmqpOverWebsocket. Se os erros eventHubsConnectionError estiverem ocorrendo devido ao tempo limite de gravação, talvez seja necessário passar um valor maior que o padrão. Isso é para cenários de uso avançado e, normalmente, o valor padrão deve ser suficiente.

Exemplos

Crie uma nova instância do EventHubProducerClient.


   import os
   from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Métodos

close

Feche a conexão e os links do AMQP subjacentes ao cliente produtor.

create_batch

Crie um objeto EventDataBatch com o tamanho máximo de todo o conteúdo restrito por max_size_in_bytes.

O max_size_in_bytes não deve ser maior que o tamanho máximo permitido da mensagem definido pelo serviço.

flush

Somente modo em buffer. Libere eventos no buffer a serem enviados imediatamente se o cliente estiver trabalhando no modo em buffer.

from_connection_string

Crie um EventHubProducerClient de um cadeia de conexão.

get_buffered_event_count

O número de eventos armazenados em buffer e aguardando para serem publicados para uma determinada partição. Retorna Nenhum no modo não armazenado em buffer. OBSERVAÇÃO: o buffer de eventos é processado em uma corrotina em segundo plano, portanto, o número de eventos no buffer relatado por essa API deve ser considerado apenas uma aproximação e é recomendado apenas para uso na depuração. Para uma ID de partição que não tem eventos armazenados em buffer, 0 será retornado independentemente de essa ID de partição realmente existir no Hub de Eventos.

get_eventhub_properties

Obter propriedades do Hub de Eventos.

As chaves no dicionário retornado incluem:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Obter IDs de partição do Hub de Eventos.

get_partition_properties

Obter propriedades da partição especificada.

As chaves no dicionário de propriedades incluem:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

Envia um lote de dados de evento. Por padrão, o método será bloqueado até que a confirmação seja recebida ou a operação atingir o tempo limite. Se o EventHubProducerClient estiver configurado para ser executado no modo em buffer, o método enfileirará os eventos no buffer local e retornará. O produtor fará o envio automático em segundo plano.

Se buffered_mode for False, on_error retorno de chamada será opcional e os erros serão tratados da seguinte maneira:

  • Se um retorno de chamada on_error for passado durante a instanciação do cliente produtor,

    em seguida, as informações de erro serão passadas para o on_error retorno de chamada, que será chamado.

  • Se um retorno de chamada on_error não for passado durante a instanciação do cliente,

    em seguida, o erro será gerado por padrão.

Se buffered_mode for True, on_error retorno de chamada será necessário e os erros serão tratados da seguinte maneira:

  • Se os eventos não conseguirem enfileirar dentro do tempo limite determinado, um erro será gerado diretamente.

  • Se os eventos não forem enviados após a enfileiramento com êxito, o retorno de chamada on_error será chamado.

No modo em buffer, o envio de um lote permanecerá intacto e enviado como uma única unidade. O lote não será reorganizado. Isso pode resultar em ineficiência do envio de eventos.

Se você estiver enviando uma lista finita de EventData ou AmqpAnnotatedMessage e souber que ele está dentro do limite de tamanho do quadro do hub de eventos, poderá enviá-los com uma chamada send_batch . Caso contrário, use create_batch para criar EventDataBatch e adicionar EventData ou AmqpAnnotatedMessage no lote um por um até o limite de tamanho e, em seguida, chame esse método para enviar o lote.

send_event

Envia dados de evento. Por padrão, o método será bloqueado até que a confirmação seja recebida ou a operação atingir o tempo limite. Se o EventHubProducerClient estiver configurado para ser executado no modo em buffer, o método enfileirará o evento no buffer local e retornará. O produtor fará o envio automático em lote e o envio em segundo plano.

Se buffered_mode for False, on_error retorno de chamada será opcional e os erros serão tratados da seguinte maneira: * Se um retorno de chamada on_error for passado durante a instanciação do cliente produtor,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Se buffered_mode for True, on_error retorno de chamada será necessário e os erros serão tratados da seguinte maneira: * Se os eventos não conseguirem enfileirar dentro do tempo limite determinado, um erro será gerado diretamente.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Feche a conexão e os links do AMQP subjacentes ao cliente produtor.

async close(*, flush: bool = True, **kwargs: Any) -> None

Parâmetros

flush
bool

Somente modo em buffer. Se definido como True, os eventos no buffer serão enviados imediatamente. O padrão é True.

timeout
float ou None

Somente modo em buffer. Tempo limite para fechar o produtor. O padrão é Nenhum, o que significa que não há tempo limite.

Tipo de retorno

Exceções

Se ocorreu um erro ao liberar o buffer se flush estiver definido como True ou fechando as conexões AMQP subjacentes no modo em buffer.

Exemplos

Feche o manipulador.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   from azure.eventhub import EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break
       await producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       await producer.close()

create_batch

Crie um objeto EventDataBatch com o tamanho máximo de todo o conteúdo restrito por max_size_in_bytes.

O max_size_in_bytes não deve ser maior que o tamanho máximo permitido da mensagem definido pelo serviço.

async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch

Tipo de retorno

Exceções

Se ocorreu um erro ao liberar o buffer se flush estiver definido como True ou fechando as conexões AMQP subjacentes no modo em buffer.

Exemplos

Criar objeto EventDataBatch dentro do tamanho limitado


       from azure.eventhub import EventData
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Somente modo em buffer. Libere eventos no buffer a serem enviados imediatamente se o cliente estiver trabalhando no modo em buffer.

async flush(**kwargs: Any) -> None

Parâmetros

timeout
float ou None

Tempo limite para liberar os eventos em buffer, o padrão é Nenhum, o que significa que não há tempo limite.

Tipo de retorno

Exceções

Se o produtor não conseguir liberar o buffer dentro do tempo limite especificado no modo em buffer.

from_connection_string

Crie um EventHubProducerClient de um cadeia de conexão.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient

Parâmetros

conn_str
str
Obrigatório

O cadeia de conexão de um Hub de Eventos.

eventhub_name
str

O caminho do Hub de Eventos específico ao qual conectar o cliente.

buffered_mode
bool

Se True, o cliente produtor coletará eventos em um buffer, em lote com eficiência e publicará. O padrão é Falso.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

O retorno de chamada a ser chamado depois que um lote for publicado com êxito. O retorno de chamada usa dois parâmetros:

  • eventos: a lista de eventos que foram publicados com êxito

  • partition_id: a ID da partição na qual os eventos na lista foram publicados.

A função de retorno de chamada deve ser definida como: on_success(events, partition_id). Ele é necessário quando buffered_mode é True enquanto opcional se buffered_mode for False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

O retorno de chamada a ser chamado depois que um lote falha ao ser publicado. A função de retorno de chamada deve ser definida como: on_error(events, partition_id, error), em que:

  • eventos: a lista de eventos que não foram publicados,

  • partition_id: a ID da partição na qual os eventos na lista foram tentados para serem publicados em e

  • error: a exceção relacionada à falha de envio.

Se buffered_mode for False, on_error retorno de chamada será opcional e os erros serão tratados da seguinte maneira:

  • Se um retorno de chamada on_error for passado durante a instanciação do cliente produtor,

    em seguida, as informações de erro serão passadas para o on_error retorno de chamada, que será chamado.

  • Se um retorno de chamada on_error não for passado durante a instanciação do cliente,

    em seguida, o erro será gerado por padrão.

Se buffered_mode for True, on_error retorno de chamada será necessário e os erros serão tratados da seguinte maneira:

  • Se os eventos não conseguirem enfileirar dentro do tempo limite determinado, um erro será gerado diretamente.

  • Se os eventos não forem enviados após a enfileiramento com êxito, o retorno de chamada on_error será chamado.

max_buffer_length
int

Somente modo em buffer. O número total de eventos por partição que podem ser armazenados em buffer antes que uma liberação seja disparada. O valor padrão é 1500 no modo em buffer.

max_wait_time
Optional[float]

Somente modo em buffer. O tempo de espera para que um lote seja criado com eventos no buffer antes da publicação. O valor padrão é 1 no modo em buffer.

logging_enable
bool

Se os logs de rastreamento de rede devem ser gerados para o agente. O padrão é False.

http_proxy
dict

Configurações de proxy HTTP. Deve ser um dicionário com as seguintes chaves: 'proxy_hostname' (valor str) e 'proxy_port' (valor int). Além disso, as seguintes chaves também podem estar presentes: 'username', 'password'.

auth_timeout
float

O tempo em segundos para aguardar que um token seja autorizado pelo serviço. O valor padrão é 60 segundos. Se definido como 0, nenhum tempo limite será imposto do cliente.

user_agent
str

Se especificado, isso será adicionado na frente da cadeia de caracteres do agente do usuário.

retry_total
int

O número total de tentativas de refazer uma operação com falha quando ocorre um erro. O valor padrão é 3.

retry_backoff_factor
float

Um fator de retirada a ser aplicado entre tentativas após a segunda tentativa (a maioria dos erros é resolvida imediatamente por uma segunda tentativa sem atraso). No modo fixo, a política de repetição sempre será suspensa para {fator de retirada}. No modo 'exponencial', a política de repetição será suspensa para: {fator de retirada} * (2 ** ({número de tentativas totais} - 1)) segundos. Se o backoff_factor for 0,1, a repetição será suspensa para [0.0s, 0.2s, 0.4s, ...] entre as repetições. O valor padrão é 0,8.

retry_backoff_max
float

O tempo máximo de retirada. O valor padrão é 120 segundos (2 minutos).

retry_mode
str

O comportamento de atraso entre tentativas de repetição. Os valores com suporte são 'fixo' ou 'exponencial', em que o padrão é 'exponencial'.

idle_timeout
float

Tempo limite, em segundos, após o qual esse cliente fechará a conexão subjacente se não houver nenhuma atividade. Por padrão, o valor é None, o que significa que o cliente não será desligado devido à inatividade, a menos que seja iniciado pelo serviço.

transport_type
TransportType

O tipo de protocolo de transporte que será usado para se comunicar com o serviço de Hubs de Eventos. O padrão é TransportType.Amqp , caso em que a porta 5671 é usada. Se a porta 5671 não estiver disponível/bloqueada no ambiente de rede, TransportType.AmqpOverWebsocket poderá ser usado, em vez disso, que usa a porta 443 para comunicação.

custom_endpoint_address
Optional[str]

O endereço do ponto de extremidade personalizado a ser usado para estabelecer uma conexão com o serviço de Hubs de Eventos, permitindo que as solicitações de rede sejam roteadas por meio de quaisquer gateways de aplicativo ou outros caminhos necessários para o ambiente de host. O padrão é None. O formato seria como "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se a porta não for especificada no custom_endpoint_address, por padrão, a porta 443 será usada.

connection_verify
Optional[str]

Caminho para o arquivo de CA_BUNDLE personalizado do certificado SSL que é usado para autenticar a identidade do ponto de extremidade de conexão. O padrão é None, caso em que certifi.where() será usado.

uamqp_transport
bool

Se a biblioteca uamqp deve ser usada como o transporte subjacente. O valor padrão é False e a biblioteca AMQP pura do Python será usada como o transporte subjacente.

Tipo de retorno

Exceções

Se ocorreu um erro ao liberar o buffer se flush estiver definido como True ou fechando as conexões AMQP subjacentes no modo em buffer.

Exemplos

Crie uma nova instância do EventHubProducerClient do cadeia de conexão.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

O número de eventos armazenados em buffer e aguardando para serem publicados para uma determinada partição. Retorna Nenhum no modo não armazenado em buffer. OBSERVAÇÃO: o buffer de eventos é processado em uma corrotina em segundo plano, portanto, o número de eventos no buffer relatado por essa API deve ser considerado apenas uma aproximação e é recomendado apenas para uso na depuração. Para uma ID de partição que não tem eventos armazenados em buffer, 0 será retornado independentemente de essa ID de partição realmente existir no Hub de Eventos.

get_buffered_event_count(partition_id: str) -> int | None

Parâmetros

partition_id
str
Obrigatório

A ID da partição de destino.

Tipo de retorno

int,

Exceções

Se ocorreu um erro ao liberar o buffer se flush estiver definido como True ou fechando as conexões AMQP subjacentes no modo em buffer.

get_eventhub_properties

Obter propriedades do Hub de Eventos.

As chaves no dicionário retornado incluem:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Retornos

Um dicionário que contém informações sobre o Hub de Eventos.

Tipo de retorno

Exceções

get_partition_ids

Obter IDs de partição do Hub de Eventos.

async get_partition_ids() -> List[str]

Retornos

Uma lista de IDs de partição.

Tipo de retorno

Exceções

get_partition_properties

Obter propriedades da partição especificada.

As chaves no dicionário de propriedades incluem:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parâmetros

partition_id
str
Obrigatório

A ID da partição de destino.

Retornos

Um dict de propriedades de partição.

Tipo de retorno

Exceções

send_batch

Envia um lote de dados de evento. Por padrão, o método será bloqueado até que a confirmação seja recebida ou a operação atingir o tempo limite. Se o EventHubProducerClient estiver configurado para ser executado no modo em buffer, o método enfileirará os eventos no buffer local e retornará. O produtor fará o envio automático em segundo plano.

Se buffered_mode for False, on_error retorno de chamada será opcional e os erros serão tratados da seguinte maneira:

  • Se um retorno de chamada on_error for passado durante a instanciação do cliente produtor,

    em seguida, as informações de erro serão passadas para o on_error retorno de chamada, que será chamado.

  • Se um retorno de chamada on_error não for passado durante a instanciação do cliente,

    em seguida, o erro será gerado por padrão.

Se buffered_mode for True, on_error retorno de chamada será necessário e os erros serão tratados da seguinte maneira:

  • Se os eventos não conseguirem enfileirar dentro do tempo limite determinado, um erro será gerado diretamente.

  • Se os eventos não forem enviados após a enfileiramento com êxito, o retorno de chamada on_error será chamado.

No modo em buffer, o envio de um lote permanecerá intacto e enviado como uma única unidade. O lote não será reorganizado. Isso pode resultar em ineficiência do envio de eventos.

Se você estiver enviando uma lista finita de EventData ou AmqpAnnotatedMessage e souber que ele está dentro do limite de tamanho do quadro do hub de eventos, poderá enviá-los com uma chamada send_batch . Caso contrário, use create_batch para criar EventDataBatch e adicionar EventData ou AmqpAnnotatedMessage no lote um por um até o limite de tamanho e, em seguida, chame esse método para enviar o lote.

async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Parâmetros

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Obrigatório

O objeto EventDataBatch a ser enviado ou uma lista de EventData a ser enviado em um lote. Todos os EventData ou AmqpAnnotatedMessage na lista ou EventDataBatch chegarão à mesma partição.

timeout
float

O tempo máximo de espera para enviar os dados do evento no modo não armazenado em buffer ou o tempo máximo de espera para enfileirar os dados do evento no buffer no modo em buffer. No modo sem buffer, o tempo de espera padrão especificado quando o produtor foi criado será usado. No modo em buffer, o tempo de espera padrão é Nenhum.

partition_id
str

A ID de partição específica para a qual enviar. O padrão é None; nesse caso, o serviço atribuirá a todas as partições usando round robin. Um TypeError será gerado se partition_id for especificado e event_data_batch for um EventDataBatch porque o próprio EventDataBatch tem partition_id.

partition_key
str

Com o partition_key fornecido, os dados de evento serão enviados para uma partição específica do Hub de Eventos decidido pelo serviço. Um TypeError será gerado se partition_key for especificado e event_data_batch for um EventDataBatch porque o próprio EventDataBatch tem partition_key. Se partition_id e partition_key forem fornecidos, o partition_id terá precedência. AVISO: definir partition_key de valor não cadeia de caracteres nos eventos a serem enviados não é desencorajado, pois o partition_key será ignorado pelo serviço do Hub de Eventos e os eventos serão atribuídos a todas as partições usando round robin. Além disso, há SDKs para consumir eventos que esperam que partition_key seja apenas o tipo de cadeia de caracteres, eles podem falhar ao analisar o valor não cadeia de caracteres.

Tipo de retorno

Exceções

Se o valor especificado pelo parâmetro de tempo limite decorrer antes que o evento possa ser enviado no modo sem buffer ou os eventos puderem ser enfileirados no buffer no modo em buffer.

Exemplos

Envia dados de evento de forma assíncrona


       async with producer:
           event_data_batch = await producer.create_batch()
           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # The EventDataBatch object reaches its max_size.
                   # You can send the full EventDataBatch object and create a new one here.
                   break
           await producer.send_batch(event_data_batch)

send_event

Envia dados de evento. Por padrão, o método será bloqueado até que a confirmação seja recebida ou a operação atingir o tempo limite. Se o EventHubProducerClient estiver configurado para ser executado no modo em buffer, o método enfileirará o evento no buffer local e retornará. O produtor fará o envio automático em lote e o envio em segundo plano.

Se buffered_mode for False, on_error retorno de chamada será opcional e os erros serão tratados da seguinte maneira: * Se um retorno de chamada on_error for passado durante a instanciação do cliente produtor,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Se buffered_mode for True, on_error retorno de chamada será necessário e os erros serão tratados da seguinte maneira: * Se os eventos não conseguirem enfileirar dentro do tempo limite determinado, um erro será gerado diretamente.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Parâmetros

event_data
Union[EventData, AmqpAnnotatedMessage]
Obrigatório

O objeto EventData a ser enviado.

timeout
float

O tempo máximo de espera para enviar os dados do evento no modo não armazenado em buffer ou o tempo máximo de espera para enfileirar os dados do evento no buffer no modo em buffer. No modo sem buffer, o tempo de espera padrão especificado quando o produtor foi criado será usado. No modo em buffer, o tempo de espera padrão é Nenhum.

partition_id
str

A ID de partição específica para a qual enviar. O padrão é None; nesse caso, o serviço atribuirá a todas as partições usando round robin. Um TypeError será gerado se partition_id for especificado e event_data_batch for um EventDataBatch porque o próprio EventDataBatch tem partition_id.

partition_key
str

Com o partition_key fornecido, os dados de evento serão enviados para uma partição específica do Hub de Eventos decidido pelo serviço. Um TypeError será gerado se partition_key for especificado e event_data_batch for um EventDataBatch porque o próprio EventDataBatch tem partition_key. Se partition_id e partition_key forem fornecidos, o partition_id terá precedência. AVISO: definir partition_key de valor não cadeia de caracteres nos eventos a serem enviados não é desencorajado, pois o partition_key será ignorado pelo serviço do Hub de Eventos e os eventos serão atribuídos a todas as partições usando round robin. Além disso, há SDKs para consumir eventos que esperam que partition_key seja apenas o tipo de cadeia de caracteres, eles podem falhar ao analisar o valor não cadeia de caracteres.

Tipo de retorno

Exceções

Se o valor especificado pelo parâmetro de tempo limite decorrer antes que o evento possa ser enviado no modo sem buffer ou se os eventos não puderem ser enfileirados no buffer no modo em buffer.

Atributos

total_buffered_event_count

O número total de eventos que estão atualmente armazenados em buffer e aguardando para serem publicados, em todas as partições. Retorna Nenhum no modo não armazenado em buffer. OBSERVAÇÃO: o buffer de eventos é processado em uma corrotina em segundo plano, portanto, o número de eventos no buffer relatado por essa API deve ser considerado apenas uma aproximação e é recomendado apenas para uso na depuração.

Tipo de retorno

int,