EventHubConsumerClient Classe

A classe EventHubConsumerClient define uma interface de alto nível para receber eventos do serviço Hubs de Eventos do Azure.

A meta main do EventHubConsumerClient é receber eventos de todas as partições de um EventHub com balanceamento de carga e ponto de verificação.

Quando várias instâncias EventHubConsumerClient estiverem em execução no mesmo hub de eventos, grupo de consumidores e local de ponto de verificação, as partições serão distribuídas uniformemente entre elas.

Para habilitar o balanceamento de carga e os pontos de verificação persistentes, checkpoint_store deve ser definido ao criar o EventHubConsumerClient. Se um repositório de ponto de verificação não for fornecido, o ponto de verificação será mantido internamente na memória.

Um EventHubConsumerClient também pode receber de uma partição específica quando você chama seu método receive() ou receive_batch() e especifica o partition_id. O balanceamento de carga não funcionará no modo de partição única. Mas os usuários ainda poderão salvar pontos de verificação se o checkpoint_store estiver definido.

Herança
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

Construtor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parâmetros

fully_qualified_namespace
str
Obrigatório

O nome do host totalmente qualificado para o namespace dos Hubs de Eventos. O formato do namespace é: .servicebus.windows.net.

eventhub_name
str
Obrigatório

O caminho do Hub de Eventos específico ao qual conectar o cliente.

consumer_group
str
Obrigatório

Receber eventos do hub de eventos para esse grupo de consumidores.

credential
AsyncTokenCredential ou AzureSasCredential ou AzureNamedKeyCredential
Obrigatório

O objeto de credencial usado para autenticação que implementa uma interface específica para obter tokens. Ele aceita EventHubSharedKeyCredential, ou objetos de credencial gerados pela biblioteca azure-identity e objetos que implementam o método *get_token(self, scopes).

logging_enable
bool

Se os logs de rastreamento de rede devem ser gerados para o agente. O padrão é False.

auth_timeout
float

O tempo em segundos para aguardar que um token seja autorizado pelo serviço. O valor padrão é 60 segundos. Se definido como 0, nenhum tempo limite será imposto do cliente.

user_agent
str

Se especificado, isso será adicionado na frente da cadeia de caracteres do agente do usuário.

retry_total
int

O número total de tentativas de refazer uma operação com falha quando ocorre um erro. O valor padrão é 3. O contexto de retry_total no recebimento é especial: o método receive é implementado por um método de recebimento interno de chamada de loop while em cada iteração. No caso de recebimento , retry_total especifica os números de repetição após o erro gerado pelo método de recebimento interno no loop while. Se as tentativas de repetição forem esgotadas, o on_error retorno de chamada será chamado (se fornecido) com as informações de erro. O consumidor de partição interna com falha será fechado (on_partition_close será chamado se fornecido) e o novo consumidor de partição interna será criado (on_partition_initialize será chamado se fornecido) para retomar o recebimento.

retry_backoff_factor
float

Um fator de retirada a ser aplicado entre tentativas após a segunda tentativa (a maioria dos erros é resolvida imediatamente por uma segunda tentativa sem atraso). No modo fixo, a política de repetição sempre será suspensa para {fator de retirada}. No modo 'exponencial', a política de repetição será suspensa para: {fator de retirada} * (2 ** ({número de tentativas totais} - 1)) segundos. Se o backoff_factor for 0,1, a repetição será suspensa para [0.0s, 0.2s, 0.4s, ...] entre as repetições. O valor padrão é 0,8.

retry_backoff_max
float

O tempo máximo de retirada. O valor padrão é 120 segundos (2 minutos).

retry_mode
str

O comportamento de atraso entre tentativas de repetição. Os valores com suporte são 'fixo' ou 'exponencial', em que o padrão é 'exponencial'.

idle_timeout
float

Tempo limite, em segundos, após o qual esse cliente fechará a conexão subjacente se não houver mais atividade. Por padrão, o valor é None, o que significa que o cliente não será desligado devido à inatividade, a menos que seja iniciado pelo serviço.

transport_type
TransportType

O tipo de protocolo de transporte que será usado para se comunicar com o serviço de Hubs de Eventos. O padrão é TransportType.Amqp , caso em que a porta 5671 é usada. Se a porta 5671 não estiver disponível/bloqueada no ambiente de rede, TransportType.AmqpOverWebsocket poderá ser usado, em vez disso, que usa a porta 443 para comunicação.

http_proxy

Configurações de proxy HTTP. Deve ser um dicionário com as seguintes chaves: 'proxy_hostname' (valor str) e 'proxy_port' (valor int).

checkpoint_store
Optional[CheckpointStore]

Um gerenciador que armazena os dados de balanceamento de carga e ponto de verificação de partição ao receber eventos. O repositório de ponto de verificação será usado em ambos os casos de recebimento de todas as partições ou de uma única partição. No último caso, o balanceamento de carga não se aplica. Se um repositório de ponto de verificação não for fornecido, o ponto de verificação será mantido internamente na memória e a instância EventHubConsumerClient receberá eventos sem balanceamento de carga.

load_balancing_interval
float

Quando o balanceamento de carga é inicial. Esse é o intervalo, em segundos, entre duas avaliações de balanceamento de carga. O padrão é 30 segundos.

partition_ownership_expiration_interval
float

Uma propriedade de partição expirará após esse número de segundos. Cada avaliação de balanceamento de carga estenderá automaticamente o tempo de expiração da propriedade. O padrão é 6 * load_balancing_interval, ou seja, 180 segundos ao usar o load_balancing_interval padrão de 30 segundos.

load_balancing_strategy
str ou LoadBalancingStrategy

Quando o balanceamento de carga iniciar, ele usará essa estratégia para reivindicar e equilibrar a propriedade da partição. Use "greedy" ou LoadBalancingStrategy.GREEDY para a estratégia greedy, que, para cada avaliação de balanceamento de carga, pegará quantas partições não reclamadas forem necessárias para equilibrar a carga. Use "balanced" ou LoadBalancingStrategy.BALANCED para a estratégia equilibrada, que, para cada avaliação de balanceamento de carga, declara apenas uma partição que não é reivindicada por outro EventHubConsumerClient. Se todas as partições de um EventHub forem reivindicadas por outro EventHubConsumerClient e esse cliente tiver reivindicado poucas partições, esse cliente roubará uma partição de outros clientes para cada avaliação de balanceamento de carga, independentemente da estratégia de balanceamento de carga. A estratégia greedy é usada por padrão.

custom_endpoint_address
Optional[str]

O endereço do ponto de extremidade personalizado a ser usado para estabelecer uma conexão com o serviço de Hubs de Eventos, permitindo que as solicitações de rede sejam roteadas por meio de quaisquer gateways de aplicativo ou outros caminhos necessários para o ambiente de host. O padrão é None. O formato seria como "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se a porta não for especificada no custom_endpoint_address, por padrão, a porta 443 será usada.

connection_verify
Optional[str]

Caminho para o arquivo de CA_BUNDLE personalizado do certificado SSL que é usado para autenticar a identidade do ponto de extremidade de conexão. O padrão é None, caso em que certifi.where() será usado.

uamqp_transport
bool

Se a biblioteca uamqp deve ser usada como o transporte subjacente. O valor padrão é False e a biblioteca AMQP pura do Python será usada como o transporte subjacente.

socket_timeout
float

O tempo em segundos que o soquete subjacente na conexão deve aguardar ao enviar e receber dados antes de atingir o tempo limite. O valor padrão é 0,2 para TransportType.Amqp e 1 para TransportType.AmqpOverWebsocket. Se os erros eventHubsConnectionError estiverem ocorrendo devido ao tempo limite de gravação, talvez seja necessário passar um valor maior que o padrão. Isso é para cenários de uso avançado e, normalmente, o valor padrão deve ser suficiente.

Exemplos

Crie uma nova instância do EventHubConsumerClient.


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     consumer_group='$Default',
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Métodos

close

Pare de recuperar eventos do Hub de Eventos e feche a conexão e os links amqp subjacentes.

from_connection_string

Crie um EventHubConsumerClient de um cadeia de conexão.

get_eventhub_properties

Obter propriedades do Hub de Eventos.

As chaves no dicionário retornado incluem:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Obtenha IDs de partição do Hub de Eventos.

get_partition_properties

Obter propriedades da partição especificada.

As chaves no dicionário de propriedades incluem:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Receber eventos de partições, com balanceamento de carga e ponto de verificação opcionais.

receive_batch

Receber eventos de partições em lotes, com balanceamento de carga e ponto de verificação opcionais.

close

Pare de recuperar eventos do Hub de Eventos e feche a conexão e os links amqp subjacentes.

async close() -> None

Tipo de retorno

Exemplos

Feche o cliente.


   import os

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub.aio import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

Crie um EventHubConsumerClient de um cadeia de conexão.

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient

Parâmetros

conn_str
str
Obrigatório

O cadeia de conexão de um Hub de Eventos.

consumer_group
str
Obrigatório

Receber eventos do Hub de Eventos para esse grupo de consumidores.

eventhub_name
str

O caminho do Hub de Eventos específico ao qual conectar o cliente.

logging_enable
bool

Se os logs de rastreamento de rede devem ser gerados para o agente. O padrão é False.

http_proxy
dict

Configurações de proxy HTTP. Deve ser um dicionário com as seguintes chaves: 'proxy_hostname' (valor str) e 'proxy_port' (valor int). Além disso, as seguintes chaves também podem estar presentes: 'username', 'password'.

auth_timeout
float

O tempo em segundos para aguardar que um token seja autorizado pelo serviço. O valor padrão é 60 segundos. Se definido como 0, nenhum tempo limite será imposto do cliente.

user_agent
str

Se especificado, isso será adicionado na frente da cadeia de caracteres do agente do usuário.

retry_total
int

O número total de tentativas de refazer uma operação com falha quando ocorre um erro. O valor padrão é 3. O contexto de retry_total no recebimento é especial: o método receive é implementado por um método de recebimento interno de chamada de loop while em cada iteração. No caso de recebimento , retry_total especifica os números de repetição após o erro gerado pelo método de recebimento interno no loop while. Se as tentativas de repetição forem esgotadas, o on_error retorno de chamada será chamado (se fornecido) com as informações de erro. O consumidor de partição interna com falha será fechado (on_partition_close será chamado se fornecido) e o novo consumidor de partição interna será criado (on_partition_initialize será chamado se fornecido) para retomar o recebimento.

retry_backoff_factor
float

Um fator de retirada a ser aplicado entre tentativas após a segunda tentativa (a maioria dos erros é resolvida imediatamente por uma segunda tentativa sem atraso). No modo fixo, a política de repetição sempre funcionará para {fator de retirada}. No modo 'exponencial', a política de repetição funcionará para: {fator de retirada} * (2 ** ({número de repetições totais} - 1)) segundos. Se o backoff_factor for 0,1, a repetição dormirá para [0.0s, 0.2s, 0.4s, ...] entre repetições. O valor padrão é 0,8.

retry_backoff_max
float

O tempo máximo de retirada. O valor padrão é 120 segundos (2 minutos).

retry_mode
str

O comportamento de atraso entre tentativas de repetição. Os valores com suporte são 'fixos' ou 'exponencial', em que o padrão é 'exponencial'.

idle_timeout
float

Tempo limite, em segundos, após o qual esse cliente fechará a conexão subjacente se não houver mais atividade. Por padrão, o valor é None, o que significa que o cliente não será desligado devido à inatividade, a menos que seja iniciado pelo serviço.

transport_type
TransportType

O tipo de protocolo de transporte que será usado para se comunicar com o serviço de Hubs de Eventos. O padrão é TransportType.Amqp , nesse caso, a porta 5671 é usada. Se a porta 5671 não estiver disponível/bloqueada no ambiente de rede, TransportType.AmqpOverWebsocket poderá ser usado, em vez disso, que usa a porta 443 para comunicação.

checkpoint_store
Optional[CheckpointStore]

Um gerenciador que armazena os dados de balanceamento de carga e ponto de verificação de partição ao receber eventos. O repositório de ponto de verificação será usado em ambos os casos de recebimento de todas as partições ou de uma única partição. No último caso, o balanceamento de carga não se aplica. Se um repositório de ponto de verificação não for fornecido, o ponto de verificação será mantido internamente na memória e a instância EventHubConsumerClient receberá eventos sem balanceamento de carga.

load_balancing_interval
float

Quando o balanceamento de carga é inicial. Esse é o intervalo, em segundos, entre duas avaliações de balanceamento de carga. O padrão é 30 segundos.

partition_ownership_expiration_interval
float

Uma propriedade de partição expirará após esse número de segundos. Cada avaliação de balanceamento de carga estenderá automaticamente o tempo de expiração da propriedade. O padrão é 6 * load_balancing_interval, ou seja, 180 segundos ao usar o load_balancing_interval padrão de 30 segundos.

load_balancing_strategy
str ou LoadBalancingStrategy

Quando o balanceamento de carga começar, ele usará essa estratégia para reivindicar e equilibrar a propriedade da partição. Use "greedy" ou LoadBalancingStrategy.GREEDY para a estratégia gananciosa, que, para cada avaliação de balanceamento de carga, pegará quantas partições não reclamadas forem necessárias para equilibrar a carga. Use "balanced" ou LoadBalancingStrategy.BALANCED para a estratégia balanceada, que, para cada avaliação de balanceamento de carga, declara apenas uma partição que não é reivindicada por outro EventHubConsumerClient. Se todas as partições de um EventHub forem reivindicadas por outro EventHubConsumerClient e esse cliente tiver reivindicado poucas partições, esse cliente roubará uma partição de outros clientes para cada avaliação de balanceamento de carga, independentemente da estratégia de balanceamento de carga. A estratégia greedy é usada por padrão.

custom_endpoint_address
Optional[str]

O endereço de ponto de extremidade personalizado a ser usado para estabelecer uma conexão com o serviço de Hubs de Eventos, permitindo que as solicitações de rede sejam roteadas por meio de gateways de aplicativo ou outros caminhos necessários para o ambiente de host. O padrão é None. O formato seria como "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se a porta não for especificada no custom_endpoint_address, por padrão, a porta 443 será usada.

connection_verify
Optional[str]

Caminho para o arquivo de CA_BUNDLE personalizado do certificado SSL que é usado para autenticar a identidade do ponto de extremidade de conexão. O padrão é Nenhum, nesse caso , certifi.where() será usado.

uamqp_transport
bool

Se deve usar a biblioteca uamqp como o transporte subjacente. O valor padrão é False e a biblioteca AMQP do Python Puro será usada como o transporte subjacente.

Tipo de retorno

Exemplos

Crie uma nova instância do EventHubConsumerClient do cadeia de conexão.


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Obter propriedades do Hub de Eventos.

As chaves no dicionário retornado incluem:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Retornos

Um dicionário que contém informações sobre o Hub de Eventos.

Tipo de retorno

Exceções

get_partition_ids

Obtenha IDs de partição do Hub de Eventos.

async get_partition_ids() -> List[str]

Retornos

Uma lista de IDs de partição.

Tipo de retorno

Exceções

get_partition_properties

Obter propriedades da partição especificada.

As chaves no dicionário de propriedades incluem:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parâmetros

partition_id
str
Obrigatório

A ID da partição de destino.

Retornos

Um dicionário que contém propriedades de partição.

Tipo de retorno

Exceções

receive

Receber eventos de partições, com balanceamento de carga e ponto de verificação opcionais.

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parâmetros

on_event
Callable[PartitionContext, Optional[EventData]]
Obrigatório

A função de retorno de chamada para lidar com um evento recebido. O retorno de chamada usa dois parâmetros: partition_context que contém o contexto de partição e o evento que é o evento recebido. A função de retorno de chamada deve ser definida como: on_event(partition_context, evento). Para obter informações detalhadas de contexto de partição, consulte PartitionContext.

max_wait_time
float

O intervalo máximo em segundos que o processador de eventos aguardará antes de chamar o retorno de chamada. Se nenhum evento for recebido nesse intervalo, o retorno de chamada on_event será chamado com Nenhum. Se esse valor for definido como Nenhum ou 0 (o padrão), o retorno de chamada não será chamado até que um evento seja recebido.

partition_id
str

Se especificado, o cliente receberá somente dessa partição. Caso contrário, o cliente receberá de todas as partições.

owner_level
int

A prioridade para um consumidor exclusivo. Um consumidor exclusivo será criado se owner_level estiver definido. Um consumidor com uma owner_level mais alta tem prioridade exclusiva mais alta. O nível de proprietário também é conhecido como o "valor de época" do consumidor.

prefetch
int

O número de eventos a serem pré-buscados do serviço para processamento. O padrão é 300.

track_last_enqueued_event_properties
bool

Indica se o consumidor deve solicitar informações sobre o último evento enfileirado em sua partição associada e acompanhar essas informações à medida que os eventos são recebidos. Quando informações sobre o evento enfileirado de partições estiverem sendo controladas, cada evento recebido do serviço hubs de eventos levará metadados sobre a partição. Isso resulta em uma pequena quantidade de consumo adicional de largura de banda de rede que geralmente é uma compensação favorável quando considerada para fazer periodicamente solicitações de propriedades de partição usando o cliente do Hub de Eventos. Ele é definido como False por padrão.

starting_position
str, int, datetime ou dict[str,any]

Comece a receber dessa posição de evento se não houver dados de ponto de verificação para uma partição. Os dados de ponto de verificação serão usados se disponíveis. Isso pode ser um ditado com ID de partição como a chave e a posição como o valor para partições individuais ou um único valor para todas as partições. O tipo de valor pode ser str, int ou datetime.datetime. Também há suporte para os valores "-1" para recebimento do início do fluxo e "@latest" para receber apenas novos eventos.

starting_position_inclusive
bool ou dict[str,bool]

Determine se o starting_position determinado é inclusivo(>=) ou não (>). True para inclusive e False para exclusivo. Isso pode ser um ditado com ID de partição como a chave e bool como o valor que indica se o starting_position para uma partição específica é inclusivo ou não. Isso também pode ser um único valor bool para todos os starting_position. O valor padrão é False.

on_error
Callable[[PartitionContext, Exception]]

A função de retorno de chamada que será chamada quando um erro for gerado durante o recebimento após o esgotamento das tentativas de repetição ou durante o processo de balanceamento de carga. O retorno de chamada usa dois parâmetros: partition_context que contém informações de partição e o erro é a exceção. partition_context pode ser Nenhum se o erro for gerado durante o processo de balanceamento de carga. O retorno de chamada deve ser definido como: on_error(partition_context, error). O retorno de chamada on_error também será chamado se uma exceção sem tratamento for gerada durante o retorno de chamada on_event .

on_partition_initialize
Callable[[PartitionContext]]

A função de retorno de chamada que será chamada depois que um consumidor de uma determinada partição concluir a inicialização. Ele também seria chamado quando um novo consumidor de partição interna é criado para assumir o processo de recebimento de um consumidor de partição interno com falha e fechado. O retorno de chamada usa um único parâmetro: partition_context que contém as informações de partição. O retorno de chamada deve ser definido como: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

A função de retorno de chamada que será chamada depois que um consumidor de uma determinada partição for fechado. Ele também seria chamado quando o erro for gerado durante o recebimento após o esgotamento das tentativas de repetição. O retorno de chamada usa dois parâmetros: partition_context que contém informações de partição e motivo para o fechamento. O retorno de chamada deve ser definido como: on_partition_close(partition_context, reason). CloseReason Consulte os vários motivos finais.

Tipo de retorno

Exemplos

Receber eventos do EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

Receber eventos de partições em lotes, com balanceamento de carga e ponto de verificação opcionais.

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parâmetros

on_event_batch
Callable[PartitionContext, List[EventData]]
Obrigatório

A função de retorno de chamada para lidar com um lote de eventos recebidos. O retorno de chamada usa dois parâmetros: partition_context que contém contexto de partição e event_batch, que são os eventos recebidos. A função de retorno de chamada deve ser definida como: on_event_batch(partition_context, event_batch). event_batch poderá ser uma lista vazia se max_wait_time não for Nenhum nem 0 e nenhum evento for recebido após max_wait_time. Para obter informações detalhadas de contexto de partição, consulte PartitionContext.

max_batch_size
int

O número máximo de eventos em um lote passado para o retorno de chamada on_event_batch. Se o número real de eventos recebidos for maior que max_batch_size, os eventos recebidos serão divididos em lotes e chamarão o retorno de chamada para cada lote com até max_batch_size eventos.

max_wait_time
float

O intervalo máximo em segundos que o processador de eventos aguardará antes de chamar o retorno de chamada. Se nenhum evento for recebido nesse intervalo, o retorno de chamada on_event_batch será chamado com uma lista vazia. Se esse valor for definido como Nenhum ou 0 (o padrão), o retorno de chamada não será chamado até que os eventos sejam recebidos.

partition_id
str

Se especificado, o cliente receberá somente dessa partição. Caso contrário, o cliente receberá de todas as partições.

owner_level
int

A prioridade para um consumidor exclusivo. Um consumidor exclusivo será criado se owner_level estiver definido. Um consumidor com uma owner_level mais alta tem prioridade exclusiva mais alta. O nível de proprietário também é conhecido como o "valor de época" do consumidor.

prefetch
int

O número de eventos a serem pré-buscados do serviço para processamento. O padrão é 300.

track_last_enqueued_event_properties
bool

Indica se o consumidor deve solicitar informações sobre o último evento enfileirado em sua partição associada e acompanhar essas informações à medida que os eventos são recebidos. Quando informações sobre o evento enfileirado de partições estiverem sendo controladas, cada evento recebido do serviço hubs de eventos levará metadados sobre a partição. Isso resulta em uma pequena quantidade de consumo adicional de largura de banda de rede que geralmente é uma compensação favorável quando considerada para fazer periodicamente solicitações de propriedades de partição usando o cliente do Hub de Eventos. Ele é definido como False por padrão.

starting_position
str, int, datetime ou dict[str,any]

Comece a receber dessa posição de evento se não houver dados de ponto de verificação para uma partição. Os dados de ponto de verificação serão usados se disponíveis. Isso pode ser um ditado com ID de partição como a chave e a posição como o valor para partições individuais ou um único valor para todas as partições. O tipo de valor pode ser str, int ou datetime.datetime. Também há suporte para os valores "-1" para recebimento do início do fluxo e "@latest" para receber apenas novos eventos.

starting_position_inclusive
bool ou dict[str,bool]

Determine se o starting_position determinado é inclusivo(>=) ou não (>). True para inclusive e False para exclusivo. Isso pode ser um ditado com ID de partição como a chave e bool como o valor que indica se o starting_position para uma partição específica é inclusivo ou não. Isso também pode ser um único valor bool para todos os starting_position. O valor padrão é False.

on_error
Callable[[PartitionContext, Exception]]

A função de retorno de chamada que será chamada quando um erro for gerado durante o recebimento após o esgotamento das tentativas de repetição ou durante o processo de balanceamento de carga. O retorno de chamada usa dois parâmetros: partition_context que contém informações de partição e o erro é a exceção. partition_context pode ser Nenhum se o erro for gerado durante o processo de balanceamento de carga. O retorno de chamada deve ser definido como: on_error(partition_context, error). O retorno de chamada on_error também será chamado se uma exceção sem tratamento for gerada durante o retorno de chamada on_event .

on_partition_initialize
Callable[[PartitionContext]]

A função de retorno de chamada que será chamada depois que um consumidor de uma determinada partição concluir a inicialização. Ele também seria chamado quando um novo consumidor de partição interna é criado para assumir o processo de recebimento de um consumidor de partição interno com falha e fechado. O retorno de chamada usa um único parâmetro: partition_context que contém as informações de partição. O retorno de chamada deve ser definido como: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

A função de retorno de chamada que será chamada depois que um consumidor de uma determinada partição for fechado. Ele também seria chamado quando o erro for gerado durante o recebimento após o esgotamento das tentativas de repetição. O retorno de chamada usa dois parâmetros: partition_context que contém informações de partição e motivo para o fechamento. O retorno de chamada deve ser definido como: on_partition_close(partition_context, reason). CloseReason Consulte os vários motivos finais.

Tipo de retorno

Exemplos

Receber eventos em lotes do EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )