EventHubConsumerClient Classe
A classe EventHubConsumerClient define uma interface de alto nível para receber eventos do serviço Hubs de Eventos do Azure.
O principal objetivo do EventHubConsumerClient é receber eventos de todas as partições de um EventHub com balanceamento de carga e pontos de verificação.
Quando várias instâncias do EventHubConsumerClient estão em execução no mesmo hub de eventos, grupo de consumidores e localização de ponto de verificação, as partições serão distribuídas uniformemente entre elas.
Para ativar o balanceamento de carga e os pontos de verificação persistentes, checkpoint_store tem de ser definido ao criar o EventHubConsumerClient. Se não for fornecido um arquivo de pontos de verificação, o ponto de verificação será mantido internamente na memória.
Um EventHubConsumerClient também pode receber de uma partição específica quando chama o método receive() ou receive_batch() e especifica o partition_id. O balanceamento de carga não funcionará no modo de partição única. No entanto, os utilizadores ainda podem guardar pontos de verificação se a checkpoint_store estiver definida.
- Herança
-
azure.eventhub._client_base.ClientBaseEventHubConsumerClient
Construtor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parâmetros
- fully_qualified_namespace
- str
O nome de anfitrião completamente qualificado para o espaço de nomes dos Hubs de Eventos. O formato do espaço de nomes é: .servicebus.windows.net.
- credential
- TokenCredential ou AzureSasCredential ou AzureNamedKeyCredential
O objeto de credencial utilizado para autenticação que implementa uma interface específica para obter tokens. Aceita objetos EventHubSharedKeyCredentialde credenciais ou ou gerados pela biblioteca de identidades do azure e objetos que implementam o método *get_token(auto, âmbitos ).
- logging_enable
- bool
Se pretende gerar registos de rastreio de rede para o logger. A predefinição é Falso.
- auth_timeout
- float
O tempo em segundos para aguardar que um token seja autorizado pelo serviço. O valor predefinido é 60 segundos. Se estiver definido como 0, não será aplicado nenhum tempo limite ao cliente.
- user_agent
- str
Se for especificado, este procedimento será adicionado à frente da cadeia de agente do utilizador.
- retry_total
- int
O número total de tentativas de refazer uma operação falhada quando ocorre um erro. O valor predefinido é 3. O contexto de retry_total na receção é especial: o método de receção é implementado por um método de receção interna de chamadas de ciclo de tempo em cada iteração. No caso de receção , retry_total especifica os números de repetição após o erro gerado pelo método de receção interno no ciclo de tempo. Se as tentativas de repetição estiverem esgotadas, a chamada de retorno on_error será chamada (se for fornecida) com as informações de erro. O consumidor de partição interna com falha será fechado (on_partition_close será chamado se for fornecido) e será criado um novo consumidor de partição interna (on_partition_initialize será chamado se for fornecido) para retomar a receção.
- retry_backoff_factor
- float
Um fator de back-off a aplicar entre tentativas após a segunda tentativa (a maioria dos erros é resolvida imediatamente por uma segunda tentativa sem demora). No modo fixo, a política de repetição irá sempre suspender para {backoff factor}. No modo "exponencial", a política de repetição irá suspender para: {backoff factor} * (2 ** ({número de repetições totais} - 1)) segundos. Se o backoff_factor for 0,1, a repetição irá suspender para [0,0s, 0,2s, 0,4s, ...] entre repetições. O valor predefinido é 0,8.
- retry_backoff_max
- float
O tempo máximo de back-off. O valor predefinido é 120 segundos (2 minutos).
- retry_mode
- str
O comportamento de atraso entre tentativas de repetição. Os valores suportados são "fixos" ou "exponenciais", em que a predefinição é "exponencial".
- idle_timeout
- float
Tempo limite, em segundos, após o qual este cliente fechará a ligação subjacente se não houver mais atividade. Por predefinição, o valor é Nenhum, o que significa que o cliente não será encerrado devido à inatividade, a menos que seja iniciado pelo serviço.
- transport_type
- TransportType
O tipo de protocolo de transporte que será utilizado para comunicar com o serviço Hubs de Eventos. A predefinição é TransportType.Amqp , caso em que a porta 5671 é utilizada. Se a porta 5671 estiver indisponível/bloqueada no ambiente de rede, pode utilizar TransportType.AmqpOverWebsocket , que utiliza a porta 443 para comunicação.
Definições de proxy HTTP. Este tem de ser um dicionário com as seguintes chaves: "proxy_hostname" (valor de str) e "proxy_port" (valor int). Além disso, também podem estar presentes as seguintes chaves: "nome de utilizador", "palavra-passe".
- checkpoint_store
- CheckpointStore ou None
Um gestor que armazena os dados de balanceamento de carga e ponto de verificação da partição ao receber eventos. O arquivo de pontos de verificação será utilizado em ambos os casos de receção de todas as partições ou de uma única partição. Neste último caso, o balanceamento de carga não se aplica. Se não for fornecido um arquivo de pontos de verificação, o ponto de verificação será mantido internamente na memória e a instância eventHubConsumerClient receberá eventos sem balanceamento de carga.
- load_balancing_interval
- float
Quando o balanceamento de carga entra em ação. Este é o intervalo, em segundos, entre duas avaliações de balanceamento de carga. A predefinição é de 30 segundos.
- partition_ownership_expiration_interval
- float
Uma propriedade de partição expirará após este número de segundos. Cada avaliação de balanceamento de carga prolongará automaticamente o tempo de expiração da propriedade. A predefinição é 6 * load_balancing_interval, ou seja, 180 segundos ao utilizar o load_balancing_interval predefinido de 30 segundos.
- load_balancing_strategy
- str ou LoadBalancingStrategy
Quando o balanceamento de carga iniciar, utilizará esta estratégia para reivindicar e equilibrar a propriedade da partição. Utilize "ganancioso" ou LoadBalancingStrategy.GREEDY para a estratégia gananciosa, que, para cada avaliação de balanceamento de carga, obterá o número de partições não reclamadas necessárias para equilibrar a carga. Utilize "balanceado" ou LoadBalancingStrategy.BALANCED para a estratégia equilibrada que, para cada avaliação de balanceamento de carga, afirma apenas uma partição que não é reivindicada por outro EventHubConsumerClient. Se todas as partições de um EventHub forem reivindicadas por outros EventHubConsumerClient e este cliente tiver reclamado poucas partições, este cliente irá roubar uma partição de outros clientes para cada avaliação de balanceamento de carga, independentemente da estratégia de balanceamento de carga. A estratégia gananciosa é utilizada por predefinição.
O endereço de ponto final personalizado a utilizar para estabelecer uma ligação ao serviço Hubs de Eventos, permitindo que os pedidos de rede sejam encaminhados através de quaisquer gateways de aplicação ou outros caminhos necessários para o ambiente anfitrião. A predefinição é Nenhuma. O formato seria como "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se a porta não for especificada no custom_endpoint_address, será utilizada por predefinição a porta 443.
Caminho para o ficheiro de CA_BUNDLE personalizado do certificado SSL que é utilizado para autenticar a identidade do ponto final de ligação. A predefinição é Nenhuma, caso em que a certificação.where() será utilizada.
- uamqp_transport
- bool
Se pretende utilizar a biblioteca uamqp como o transporte subjacente. O valor predefinido é Falso e a biblioteca AMQP de Python Puro será utilizada como o transporte subjacente.
- socket_timeout
- float
O tempo em segundos que o socket subjacente na ligação deve aguardar ao enviar e receber dados antes de exceder o tempo limite. O valor predefinido é 0,2 para TransportType.Amqp e 1 para TransportType.AmqpOverWebsocket. Se os erros do EventHubsConnectionError estiverem a ocorrer devido ao tempo limite de escrita, poderá ser necessário transmitir um valor maior do que o predefinido. Isto destina-se a cenários de utilização avançada e, normalmente, o valor predefinido deve ser suficiente.
Exemplos
Crie uma nova instância do EventHubConsumerClient.
import os
from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
consumer = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group='$Default',
credential=credential)
Métodos
close |
Pare de obter eventos do Hub de Eventos e feche as ligações e a ligação AMQP subjacentes. |
from_connection_string |
Crie um EventHubConsumerClient a partir de um cadeia de ligação. |
get_eventhub_properties |
Obtenha as propriedades do Hub de Eventos. As chaves no dicionário devolvido incluem:
|
get_partition_ids |
Obter IDs de partição do Hub de Eventos. |
get_partition_properties |
Obtenha as propriedades da partição especificada. As chaves no dicionário de propriedades incluem:
|
receive |
Receber eventos de partições, com balanceamento de carga e pontos de verificação opcionais. |
receive_batch |
Receber eventos de partições, com balanceamento de carga e pontos de verificação opcionais. |
close
Pare de obter eventos do Hub de Eventos e feche as ligações e a ligação AMQP subjacentes.
close() -> None
Tipo de retorno
Exemplos
Feche o cliente.
import os
import threading
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group="$Default",
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The 'receive' method is a blocking call, it can be executed in a thread for
# non-blocking behavior, and combined with the 'close' method.
worker = threading.Thread(
target=consumer.receive,
kwargs={
"on_event": on_event,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
worker.start()
time.sleep(10) # Keep receiving for 10s then close.
# Close down the consumer handler explicitly.
consumer.close()
from_connection_string
Crie um EventHubConsumerClient a partir de um cadeia de ligação.
from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient
Parâmetros
- eventhub_name
- str
O caminho do Hub de Eventos específico ao qual ligar o cliente.
- logging_enable
- bool
Se pretende gerar registos de rastreio de rede para o logger. A predefinição é Falso.
- auth_timeout
- float
O tempo em segundos para aguardar que um token seja autorizado pelo serviço. O valor predefinido é 60 segundos. Se estiver definido como 0, não será aplicado nenhum tempo limite ao cliente.
- user_agent
- str
Se for especificado, este procedimento será adicionado à frente da cadeia de agente do utilizador.
- retry_total
- int
O número total de tentativas de refazer uma operação falhada quando ocorre um erro. O valor predefinido é 3. O contexto de retry_total na receção é especial: o método de receção é implementado por um método de receção interna de chamadas de ciclo de tempo em cada iteração. No caso de receção , retry_total especifica os números de repetição após o erro gerado pelo método de receção interno no ciclo de tempo. Se as tentativas de repetição estiverem esgotadas, a chamada de retorno on_error será chamada (se for fornecida) com as informações de erro. O consumidor de partição interna com falha será fechado (on_partition_close será chamado se for fornecido) e será criado um novo consumidor de partição interna (on_partition_initialize será chamado se for fornecido) para retomar a receção.
- retry_backoff_factor
- float
Um fator de back-off a aplicar entre tentativas após a segunda tentativa (a maioria dos erros é resolvida imediatamente por uma segunda tentativa sem demora). No modo fixo, a política de repetição irá sempre suspender para {backoff factor}. No modo "exponencial", a política de repetição irá suspender para: {backoff factor} * (2 ** ({número de repetições totais} - 1)) segundos. Se o backoff_factor for 0,1, a repetição irá suspender para [0,0s, 0,2s, 0,4s, ...] entre repetições. O valor predefinido é 0,8.
- retry_backoff_max
- float
O tempo máximo de folga. O valor predefinido é 120 segundos (2 minutos).
- retry_mode
- str
O comportamento de atraso entre tentativas de repetição. Os valores suportados são "fixos" ou "exponenciais", em que a predefinição é "exponencial".
- idle_timeout
- float
Tempo limite, em segundos, após o qual este cliente fechará a ligação subjacente se não houver atividade de furthur. Por predefinição, o valor é Nenhum, o que significa que o cliente não será encerrado devido a inatividade, a menos que seja iniciado pelo serviço.
- transport_type
- TransportType
O tipo de protocolo de transporte que será utilizado para comunicar com o serviço Hubs de Eventos. A predefinição é TransportType.Amqp , caso em que é utilizada a porta 5671. Se a porta 5671 estiver indisponível/bloqueada no ambiente de rede, pode utilizar TransportType.AmqpOverWebsocket , que utiliza a porta 443 para comunicação.
- http_proxy
- dict
Definições de proxy HTTP. Tem de ser um dicionário com as seguintes chaves: "proxy_hostname" (valor str) e "proxy_port" (valor int). Além disso, também podem estar presentes as seguintes chaves: "nome de utilizador", "palavra-passe".
- checkpoint_store
- CheckpointStore ou None
Um gestor que armazena os dados de balanceamento de carga e ponto de verificação da partição ao receber eventos. O arquivo de pontos de verificação será utilizado em ambos os casos de receção de todas as partições ou de uma única partição. Neste último caso, o balanceamento de carga não se aplica. Se não for fornecido um arquivo de ponto de verificação, o ponto de verificação será mantido internamente na memória e a instância EventHubConsumerClient receberá eventos sem balanceamento de carga.
- load_balancing_interval
- float
Quando o balanceamento de carga é ativado. Este é o intervalo, em segundos, entre duas avaliações de balanceamento de carga. A predefinição é 10 segundos.
- partition_ownership_expiration_interval
- float
Uma propriedade de partição expirará após este número de segundos. Cada avaliação de balanceamento de carga prolongará automaticamente o tempo de expiração da propriedade. A predefinição é 6 * load_balancing_interval, ou seja, 60 segundos ao utilizar o load_balancing_interval predefinido de 30 segundos.
- load_balancing_strategy
- str ou LoadBalancingStrategy
Quando o balanceamento de carga é ativado, utilizará esta estratégia para reivindicar e equilibrar a propriedade da partição. Utilize "ganancioso" ou LoadBalancingStrategy.GREEDY para a estratégia gananciosa, que, para cada avaliação de balanceamento de carga, obterá o número de partições não reclamadas necessárias para equilibrar a carga. Utilize "balanceado" ou LoadBalancingStrategy.BALANCED para a estratégia equilibrada que, para cada avaliação de balanceamento de carga, afirma apenas uma partição que não é reivindicada por outro EventHubConsumerClient. Se todas as partições de um EventHub forem reivindicadas por outros EventHubConsumerClient e este cliente tiver afirmado poucas partições, este cliente roubará uma partição de outros clientes para cada avaliação de balanceamento de carga, independentemente da estratégia de balanceamento de carga. A estratégia gananciosa é utilizada por predefinição.
O endereço de ponto final personalizado a utilizar para estabelecer uma ligação ao serviço dos Hubs de Eventos, permitindo que os pedidos de rede sejam encaminhados através de quaisquer gateways de aplicação ou outros caminhos necessários para o ambiente anfitrião. A predefinição é Nenhuma. O formato seria como "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se a porta não for especificada no custom_endpoint_address, será utilizada por predefinição a porta 443.
Caminho para o ficheiro de CA_BUNDLE personalizado do certificado SSL que é utilizado para autenticar a identidade do ponto final de ligação. A predefinição é Nenhum, caso em que certifi.where() será utilizado.
- uamqp_transport
- bool
Se pretende utilizar a biblioteca uamqp como o transporte subjacente. O valor predefinido é Falso e a biblioteca AMQP de Python Puro será utilizada como o transporte subjacente.
Tipo de retorno
Exemplos
Crie uma nova instância do EventHubConsumerClient a partir de cadeia de ligação.
import os
from azure.eventhub import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Obtenha as propriedades do Hub de Eventos.
As chaves no dicionário devolvido incluem:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
get_eventhub_properties() -> Dict[str, Any]
Devoluções
Um dicionário que contém informações sobre o Hub de Eventos.
Tipo de retorno
Exceções
get_partition_ids
Obter IDs de partição do Hub de Eventos.
get_partition_ids() -> List[str]
Devoluções
Uma lista de IDs de partição.
Tipo de retorno
Exceções
get_partition_properties
Obtenha as propriedades da partição especificada.
As chaves no dicionário de propriedades incluem:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
get_partition_properties(partition_id: str) -> Dict[str, Any]
Parâmetros
Devoluções
Um dicionário que contém propriedades de partição.
Tipo de retorno
Exceções
receive
Receber eventos de partições, com balanceamento de carga e pontos de verificação opcionais.
receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None
Parâmetros
- on_event
- callable[PartitionContext, EventData ou None]
A função de chamada de retorno para processar um evento recebido. A chamada de retorno utiliza dois parâmetros: partition_context que contém o contexto de partição e o evento que é o evento recebido. A função de chamada de retorno deve ser definida como: on_event(partition_context, evento). Para obter informações detalhadas sobre o contexto de partição, veja PartitionContext.
- max_wait_time
- float
O intervalo máximo em segundos que o processador de eventos aguardará antes de chamar a chamada de retorno. Se não forem recebidos eventos dentro deste intervalo, a chamada de retorno on_event será chamada com Nenhum. Se este valor estiver definido como Nenhum ou 0 (a predefinição), a chamada de retorno não será chamada até que um evento seja recebido.
- partition_id
- str
Se for especificado, o cliente só receberá desta partição. Caso contrário, o cliente receberá de todas as partições.
- owner_level
- int
A prioridade para um consumidor exclusivo. Será criado um consumidor exclusivo se owner_level estiver definido. Um consumidor com um owner_level superior tem uma prioridade exclusiva mais elevada. O nível de proprietário também é conhecido como o "valor de época" do consumidor.
- prefetch
- int
O número de eventos a pré-obter do serviço para processamento. A predefinição é 300.
- track_last_enqueued_event_properties
- bool
Indica se o consumidor deve pedir informações sobre o último evento em fila na partição associada e controlar essas informações à medida que os eventos são recebidos. Quando as informações sobre o último evento em fila de espera das partições estão a ser controladas, cada evento recebido do serviço Hubs de Eventos irá transportar metadados sobre a partição. Isto resulta numa pequena quantidade de consumo adicional de largura de banda de rede que é geralmente um compromisso favorável quando considerado contra a realização periódica de pedidos de propriedades de partição com o cliente do Hub de Eventos. Está definido como Falso por predefinição.
Comece a receber a partir desta posição de evento se não existirem dados de ponto de verificação para uma partição. Os dados do ponto de verificação serão utilizados, se disponíveis. Isto pode ser um ditado com o ID da partição como a chave e a posição como o valor para partições individuais ou um valor único para todas as partições. O tipo de valor pode ser str, int ou datetime.datetime. Também são suportados os valores "-1" para receber desde o início do fluxo e "@latest" para receber apenas novos eventos. O valor predefinido é "@latest".
Determine se o starting_position especificado é inclusivo(>=) ou não (>). Verdadeiro para inclusive e Falso para exclusivo. Isto pode ser um ditado com o ID da partição como chave e bool como o valor que indica se a starting_position para uma partição específica é inclusiva ou não. Também pode ser um valor bool único para todos os starting_position. O valor predefinido é Falso.
- on_error
- callable[[PartitionContext, Exception]]
A função de chamada de retorno que será chamada quando um erro é gerado durante a receção após as tentativas de repetição serem esgotadas ou durante o processo de balanceamento de carga. A chamada de retorno utiliza dois parâmetros: partition_context que contém informações de partição e o erro é a exceção. partition_context pode ser Nenhum se o erro for gerado durante o processo de balanceamento de carga. A chamada de retorno deve ser definida como: on_error(partition_context, erro). A chamada de retorno on_error também será chamada se for gerada uma exceção não processada durante a chamada de retorno do on_event .
- on_partition_initialize
- callable[[PartitionContext]]
A função de chamada de retorno que será chamada depois de um consumidor para uma determinada partição concluir a inicialização. Também seria chamado quando um novo consumidor interno de partições for criado para assumir o processo de receção de um consumidor de partições interno com falhas e fechado. A chamada de retorno utiliza um único parâmetro: partition_context que contém as informações da partição. A chamada de retorno deve ser definida como: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
A função de chamada de retorno que será chamada depois de um consumidor para uma determinada partição ser fechada. Também seria chamado quando o erro é gerado durante a receção após as tentativas de repetição estarem esgotadas. A chamada de retorno utiliza dois parâmetros: partition_context que contém informações de partição e motivo para fechar. A chamada de retorno deve ser definida como: on_partition_close(partition_context, razão). CloseReason Veja os vários motivos de fecho.
Tipo de retorno
Exemplos
Receber eventos do EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive(on_event=on_event)
receive_batch
Receber eventos de partições, com balanceamento de carga e pontos de verificação opcionais.
receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None
Parâmetros
- on_event_batch
- callable[PartitionContext, list[EventData]]
A função de chamada de retorno para processar um lote de eventos recebidos. A chamada de retorno utiliza dois parâmetros: partition_context que contém o contexto de partição e event_batch, que são os eventos recebidos. A função de chamada de retorno deve ser definida como: on_event_batch(partition_context, event_batch). event_batch pode ser uma lista vazia se max_wait_time não for Nenhum nem 0 e nenhum evento for recebido após max_wait_time. Para obter informações detalhadas sobre o contexto de partição, veja PartitionContext.
- max_batch_size
- int
O número máximo de eventos num lote transmitido para chamada de retorno on_event_batch. Se o número real de eventos recebidos for maior do que max_batch_size, os eventos recebidos serão divididos em lotes e chamarão a chamada de retorno para cada lote com até max_batch_size eventos.
- max_wait_time
- float
O intervalo máximo em segundos que o processador de eventos aguardará antes de chamar a chamada de retorno. Se não forem recebidos eventos dentro deste intervalo, a chamada de retorno on_event_batch será chamada com uma lista vazia.
- partition_id
- str
Se for especificado, o cliente só receberá desta partição. Caso contrário, o cliente receberá de todas as partições.
- owner_level
- int
A prioridade para um consumidor exclusivo. Será criado um consumidor exclusivo se owner_level estiver definido. Um consumidor com um owner_level superior tem uma prioridade exclusiva mais elevada. O nível de proprietário também é conhecido como o "valor de época" do consumidor.
- prefetch
- int
O número de eventos a pré-obter do serviço para processamento. A predefinição é 300.
- track_last_enqueued_event_properties
- bool
Indica se o consumidor deve pedir informações sobre o último evento em fila na partição associada e controlar essas informações à medida que os eventos são recebidos. Quando as informações sobre o último evento em fila de espera das partições estão a ser controladas, cada evento recebido do serviço Hubs de Eventos irá transportar metadados sobre a partição. Isto resulta numa pequena quantidade de consumo adicional de largura de banda de rede que é geralmente um compromisso favorável quando considerado contra a realização periódica de pedidos de propriedades de partição com o cliente do Hub de Eventos. Está definido como Falso por predefinição.
Comece a receber a partir desta posição de evento se não existirem dados de ponto de verificação para uma partição. Os dados do ponto de verificação serão utilizados, se disponíveis. Isto pode ser um ditado com o ID da partição como a chave e a posição como o valor para partições individuais ou um valor único para todas as partições. O tipo de valor pode ser str, int ou datetime.datetime. Também são suportados os valores "-1" para receber desde o início do fluxo e "@latest" para receber apenas novos eventos. O valor predefinido é "@latest".
Determine se o starting_position especificado é inclusivo(>=) ou não (>). Verdadeiro para inclusive e Falso para exclusivo. Isto pode ser um ditado com o ID da partição como chave e bool como o valor que indica se a starting_position para uma partição específica é inclusiva ou não. Também pode ser um valor bool único para todos os starting_position. O valor predefinido é Falso.
- on_error
- callable[[PartitionContext, Exception]]
A função de chamada de retorno que será chamada quando um erro é gerado durante a receção após as tentativas de repetição serem esgotadas ou durante o processo de balanceamento de carga. A chamada de retorno utiliza dois parâmetros: partition_context que contém informações de partição e o erro é a exceção. partition_context pode ser Nenhum se o erro for gerado durante o processo de balanceamento de carga. A chamada de retorno deve ser definida como: on_error(partition_context, erro). A chamada de retorno on_error também será chamada se for gerada uma exceção não processada durante a chamada de retorno do on_event .
- on_partition_initialize
- callable[[PartitionContext]]
A função de chamada de retorno que será chamada depois de um consumidor para uma determinada partição concluir a inicialização. Também seria chamado quando um novo consumidor interno de partições for criado para assumir o processo de receção de um consumidor de partições interno com falhas e fechado. A chamada de retorno utiliza um único parâmetro: partition_context que contém as informações da partição. A chamada de retorno deve ser definida como: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
A função de chamada de retorno que será chamada depois de um consumidor para uma determinada partição ser fechada. Também seria chamado quando o erro é gerado durante a receção após as tentativas de repetição estarem esgotadas. A chamada de retorno utiliza dois parâmetros: partition_context que contém informações de partição e motivo para fechar. A chamada de retorno deve ser definida como: on_partition_close(partition_context, razão). CloseReason Veja os vários motivos de fecho.
Tipo de retorno
Exemplos
Receber eventos em lotes do EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received events from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive_batch(on_event_batch=on_event_batch)
Azure SDK for Python