EventHubConsumerClient Classe
La classe EventHubConsumerClient définit une interface de haut niveau pour la réception d’événements du service Azure Event Hubs.
L’objectif main d’EventHubConsumerClient est de recevoir des événements de toutes les partitions d’un EventHub avec équilibrage de charge et point de contrôle.
Lorsque plusieurs instances EventHubConsumerClient s’exécutent sur le même hub d’événements, le même groupe de consommateurs et le même emplacement de point de contrôle, les partitions sont réparties uniformément entre elles.
Pour activer l’équilibrage de charge et les points de contrôle persistants, checkpoint_store devez être défini lors de la création d’EventHubConsumerClient. Si aucun magasin de points de contrôle n’est fourni, le point de contrôle est conservé en interne en mémoire.
Un EventHubConsumerClient peut également recevoir à partir d’une partition spécifique lorsque vous appelez sa méthode receive() ou receive_batch() et spécifiez le partition_id. L’équilibrage de charge ne fonctionne pas en mode à partition unique. Toutefois, les utilisateurs peuvent toujours enregistrer des points de contrôle si le checkpoint_store est défini.
- Héritage
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubConsumerClient
Constructeur
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Paramètres
- fully_qualified_namespace
- str
Nom d’hôte complet pour l’espace de noms Event Hubs. Le format de l’espace de noms est : .servicebus.windows.net.
- eventhub_name
- str
Chemin d’accès du hub d’événements spécifique auquel connecter le client.
- consumer_group
- str
Recevez des événements du hub d’événements pour ce groupe de consommateurs.
- credential
- AsyncTokenCredential ou AzureSasCredential ou AzureNamedKeyCredential
Objet d’informations d’identification utilisé pour l’authentification qui implémente une interface particulière pour obtenir des jetons. Il accepte EventHubSharedKeyCredential, ou les objets d’informations d’identification générés par la bibliothèque azure-identity et les objets qui implémentent la méthode *get_token(self, scopes).
- logging_enable
- bool
Indique si les journaux de trace réseau doivent être générés dans l’enregistreur d’événements. La valeur par défaut est False.
- auth_timeout
- float
Délai en secondes d’attente pour qu’un jeton soit autorisé par le service. La valeur par défaut est 60 secondes. Si la valeur est 0, aucun délai d’expiration n’est appliqué à partir du client.
- user_agent
- str
Si elle est spécifiée, elle est ajoutée devant la chaîne de l’agent utilisateur.
- retry_total
- int
Nombre total de tentatives de restauration d’une opération ayant échoué lorsqu’une erreur se produit. La valeur par défaut est 3. Le contexte de retry_total de réception est spécial : la méthode receive est implémentée par une méthode de réception interne appelée while-loop dans chaque itération. Dans le cas de réception , retry_total spécifie le nombre de nouvelles tentatives après l’erreur déclenchée par la méthode de réception interne dans la boucle while. Si les tentatives de nouvelle tentative sont épuisées, le rappel on_error est appelé (s’il est fourni) avec les informations d’erreur. Le consommateur de partition interne défaillant sera fermé (on_partition_close sera appelé s’il est fourni) et un nouveau consommateur de partition interne sera créé (on_partition_initialize sera appelé s’il est fourni) pour reprendre la réception.
- retry_backoff_factor
- float
Facteur d’interruption à appliquer entre les tentatives après la deuxième tentative (la plupart des erreurs sont résolues immédiatement par un deuxième essai sans délai). En mode fixe, la stratégie de nouvelle tentative est toujours mise en veille pour {facteur d’interruption}. En mode « exponentiel », la stratégie de nouvelle tentative est mise en veille pendant : {facteur d’interruption} * (2 ** ({nombre de nouvelles tentatives totales} - 1)) secondes. Si la backoff_factor est 0.1, la nouvelle tentative est mise en veille pour [0.0s, 0.2s, 0.4s, ...] entre les nouvelles tentatives. La valeur par défaut est 0,8.
- retry_backoff_max
- float
Temps d’arrêt maximal. La valeur par défaut est 120 secondes (2 minutes).
- retry_mode
- str
Comportement de délai entre les tentatives de nouvelle tentative. Les valeurs prises en charge sont « fixes » ou « exponentielles », où la valeur par défaut est « exponentielle ».
- idle_timeout
- float
Délai d’expiration, en secondes, après lequel ce client ferme la connexion sous-jacente s’il n’y a plus d’activité. Par défaut, la valeur est None, ce qui signifie que le client ne s’arrêtera pas en raison de l’inactivité, sauf si le service l’a initié.
- transport_type
- TransportType
Type de protocole de transport qui sera utilisé pour communiquer avec le service Event Hubs. La valeur par défaut est TransportType.Amqp , auquel cas le port 5671 est utilisé. Si le port 5671 est indisponible/bloqué dans l’environnement réseau, TransportType.AmqpOverWebsocket peut être utilisé à la place, qui utilise le port 443 pour la communication.
- http_proxy
Paramètres du proxy HTTP. Il doit s’agir d’un dictionnaire avec les clés suivantes : « proxy_hostname » (valeur str) et « proxy_port » (valeur int).
- checkpoint_store
- Optional[CheckpointStore]
Gestionnaire qui stocke les données d’équilibrage de charge et de point de contrôle de la partition lors de la réception d’événements. Le magasin de points de contrôle sera utilisé dans les deux cas de réception à partir de toutes les partitions ou d’une seule partition. Dans ce dernier cas, l’équilibrage de charge ne s’applique pas. Si aucun magasin de points de contrôle n’est fourni, le point de contrôle est conservé en interne en mémoire et le instance EventHubConsumerClient reçoit des événements sans équilibrage de charge.
- load_balancing_interval
- float
Lorsque l’équilibrage de charge entre en jeu. Il s’agit de l’intervalle, en secondes, entre deux évaluations d’équilibrage de charge. La valeur par défaut est de 30 secondes.
- partition_ownership_expiration_interval
- float
Une propriété de partition expire après ce nombre de secondes. Chaque évaluation de l’équilibrage de charge étend automatiquement le délai d’expiration de la propriété. La valeur par défaut est 6 * load_balancing_interval, c’est-à-dire 180 secondes lors de l’utilisation de la load_balancing_interval par défaut de 30 secondes.
- load_balancing_strategy
- str ou LoadBalancingStrategy
Lorsque l’équilibrage de charge entre en jeu, il utilise cette stratégie pour revendiquer et équilibrer la propriété de la partition. Utilisez « gourmand » ou LoadBalancingStrategy.GREEDY pour la stratégie gourmande, qui, pour chaque évaluation d’équilibrage de charge, récupère autant de partitions non réclamées nécessaires pour équilibrer la charge. Utilisez « balanced » ou LoadBalancingStrategy.BALANCED pour la stratégie équilibrée, qui, pour chaque évaluation d’équilibrage de charge, ne revendique qu’une seule partition qui n’est pas revendiquée par un autre EventHubConsumerClient. Si toutes les partitions d’un EventHub sont revendiquées par un autre EventHubConsumerClient et que ce client a réclamé trop peu de partitions, ce client vole une partition à d’autres clients pour chaque évaluation d’équilibrage de charge, quelle que soit la stratégie d’équilibrage de charge. La stratégie gourmande est utilisée par défaut.
Adresse de point de terminaison personnalisée à utiliser pour établir une connexion au service Event Hubs, ce qui permet aux demandes réseau d’être routées via toutes les passerelles Application Gateway ou autres chemins nécessaires pour l’environnement hôte. La valeur par défaut est None. Le format est « sb://< custom_endpoint_hostname> :<custom_endpoint_port> ». Si le port n’est pas spécifié dans le custom_endpoint_address, le port 443 est utilisé par défaut.
Chemin d’accès au fichier de CA_BUNDLE personnalisé du certificat SSL utilisé pour authentifier l’identité du point de terminaison de connexion. La valeur par défaut est None, auquel cas certifi.where() sera utilisé.
- uamqp_transport
- bool
Indique s’il faut utiliser la bibliothèque uamqp comme transport sous-jacent. La valeur par défaut est False et la bibliothèque PURE PYTHON AMQP sera utilisée comme transport sous-jacent.
- socket_timeout
- float
Durée en secondes pendant laquelle le socket sous-jacent sur la connexion doit attendre lors de l’envoi et de la réception de données avant d’expirer. La valeur par défaut est 0,2 pour TransportType.Amqp et 1 pour TransportType.AmqpOverWebsocket. Si des erreurs EventHubsConnectionError se produisent en raison d’un délai d’attente d’écriture, une valeur supérieure à la valeur par défaut peut être passée. Il s’agit de scénarios d’utilisation avancés et la valeur par défaut doit généralement être suffisante.
Exemples
Créez une instance de eventHubConsumerClient.
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
consumer_group='$Default',
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Méthodes
close |
Arrêtez la récupération des événements à partir d’Event Hub et fermez la connexion ET les liens AMQP sous-jacents. |
from_connection_string |
Créez un EventHubConsumerClient à partir d’un chaîne de connexion. |
get_eventhub_properties |
Obtient les propriétés du hub d’événements. Les clés dans le dictionnaire retourné sont les suivantes :
|
get_partition_ids |
Obtenez les ID de partition de l’Event Hub. |
get_partition_properties |
Obtient les propriétés de la partition spécifiée. Les clés du dictionnaire de propriétés sont les suivantes :
|
receive |
Recevoir des événements de partitions, avec l’équilibrage de charge et les points de contrôle facultatifs. |
receive_batch |
Recevoir des événements de partitions par lots, avec l’équilibrage de charge et les points de contrôle facultatifs. |
close
Arrêtez la récupération des événements à partir d’Event Hub et fermez la connexion ET les liens AMQP sous-jacents.
async close() -> None
Type de retour
Exemples
Fermez le client.
import os
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
# Close down the consumer handler explicitly.
await consumer.close()
from_connection_string
Créez un EventHubConsumerClient à partir d’un chaîne de connexion.
from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient
Paramètres
- consumer_group
- str
Recevez des événements du hub d’événements pour ce groupe de consommateurs.
- eventhub_name
- str
Chemin d’accès du hub d’événements spécifique auquel connecter le client.
- logging_enable
- bool
Indique si les journaux de trace réseau doivent être générés dans l’enregistreur d’événements. La valeur par défaut est False.
- http_proxy
- dict
Paramètres du proxy HTTP. Il doit s’agir d’un dictionnaire avec les clés suivantes : « proxy_hostname » (valeur str) et « proxy_port » (valeur int). En outre, les clés suivantes peuvent également être présentes : « nom d’utilisateur », « mot de passe ».
- auth_timeout
- float
Délai en secondes d’attente pour qu’un jeton soit autorisé par le service. La valeur par défaut est 60 secondes. Si la valeur est 0, aucun délai d’expiration n’est appliqué à partir du client.
- user_agent
- str
Si elle est spécifiée, elle est ajoutée devant la chaîne de l’agent utilisateur.
- retry_total
- int
Nombre total de tentatives de restauration d’une opération ayant échoué lorsqu’une erreur se produit. La valeur par défaut est 3. Le contexte de retry_total de réception est spécial : la méthode receive est implémentée par une méthode de réception interne appelée while-loop dans chaque itération. Dans le cas de réception , retry_total spécifie le nombre de nouvelles tentatives après l’erreur déclenchée par la méthode de réception interne dans la boucle while. Si les tentatives de nouvelle tentative sont épuisées, le rappel on_error est appelé (s’il est fourni) avec les informations d’erreur. Le consommateur de partition interne défaillant sera fermé (on_partition_close sera appelé s’il est fourni) et un nouveau consommateur de partition interne sera créé (on_partition_initialize sera appelé s’il est fourni) pour reprendre la réception.
- retry_backoff_factor
- float
Facteur d’interruption à appliquer entre les tentatives après la deuxième tentative (la plupart des erreurs sont résolues immédiatement par un deuxième essai sans délai). En mode fixe, la stratégie de nouvelle tentative est toujours mise en veille pour {facteur d’interruption}. En mode « exponentiel », la stratégie de nouvelle tentative est mise en veille pendant : {facteur d’interruption} * (2 ** ({nombre de nouvelles tentatives totales} - 1)) secondes. Si la backoff_factor est 0.1, la nouvelle tentative est mise en veille pour [0.0s, 0.2s, 0.4s, ...] entre les nouvelles tentatives. La valeur par défaut est 0,8.
- retry_backoff_max
- float
Temps d’arrêt maximal. La valeur par défaut est 120 secondes (2 minutes).
- retry_mode
- str
Comportement de délai entre les tentatives de nouvelle tentative. Les valeurs prises en charge sont « fixes » ou « exponentielles », où la valeur par défaut est « exponentielle ».
- idle_timeout
- float
Délai d’expiration, en secondes, après lequel ce client ferme la connexion sous-jacente s’il n’y a plus d’activité. Par défaut, la valeur est None, ce qui signifie que le client ne s’arrêtera pas en raison de l’inactivité, sauf si le service l’a initié.
- transport_type
- TransportType
Type de protocole de transport qui sera utilisé pour communiquer avec le service Event Hubs. La valeur par défaut est TransportType.Amqp , auquel cas le port 5671 est utilisé. Si le port 5671 est indisponible/bloqué dans l’environnement réseau, TransportType.AmqpOverWebsocket peut être utilisé à la place, qui utilise le port 443 pour la communication.
- checkpoint_store
- Optional[CheckpointStore]
Gestionnaire qui stocke les données d’équilibrage de charge et de point de contrôle de la partition lors de la réception d’événements. Le magasin de points de contrôle sera utilisé dans les deux cas de réception à partir de toutes les partitions ou d’une seule partition. Dans ce dernier cas, l’équilibrage de charge ne s’applique pas. Si aucun magasin de points de contrôle n’est fourni, le point de contrôle est conservé en interne en mémoire et le instance EventHubConsumerClient reçoit des événements sans équilibrage de charge.
- load_balancing_interval
- float
Lorsque l’équilibrage de charge entre en jeu. Il s’agit de l’intervalle, en secondes, entre deux évaluations d’équilibrage de charge. La valeur par défaut est de 30 secondes.
- partition_ownership_expiration_interval
- float
Une propriété de partition expire après ce nombre de secondes. Chaque évaluation de l’équilibrage de charge étend automatiquement le délai d’expiration de la propriété. La valeur par défaut est 6 * load_balancing_interval, c’est-à-dire 180 secondes lors de l’utilisation de la load_balancing_interval par défaut de 30 secondes.
- load_balancing_strategy
- str ou LoadBalancingStrategy
Lorsque l’équilibrage de charge entre en jeu, il utilise cette stratégie pour revendiquer et équilibrer la propriété de la partition. Utilisez « gourmand » ou LoadBalancingStrategy.GREEDY pour la stratégie gourmande, qui, pour chaque évaluation d’équilibrage de charge, récupère autant de partitions non réclamées nécessaires pour équilibrer la charge. Utilisez « balanced » ou LoadBalancingStrategy.BALANCED pour la stratégie équilibrée, qui, pour chaque évaluation d’équilibrage de charge, ne revendique qu’une seule partition qui n’est pas revendiquée par un autre EventHubConsumerClient. Si toutes les partitions d’un EventHub sont revendiquées par un autre EventHubConsumerClient et que ce client a réclamé trop peu de partitions, ce client vole une partition à d’autres clients pour chaque évaluation d’équilibrage de charge, quelle que soit la stratégie d’équilibrage de charge. La stratégie gourmande est utilisée par défaut.
Adresse de point de terminaison personnalisée à utiliser pour établir une connexion au service Event Hubs, ce qui permet aux demandes réseau d’être routées via toutes les passerelles Application Gateway ou autres chemins nécessaires pour l’environnement hôte. La valeur par défaut est None. Le format est « sb://< custom_endpoint_hostname> :<custom_endpoint_port> ». Si le port n’est pas spécifié dans le custom_endpoint_address, le port 443 est utilisé par défaut.
Chemin d’accès au fichier de CA_BUNDLE personnalisé du certificat SSL utilisé pour authentifier l’identité du point de terminaison de connexion. La valeur par défaut est None, auquel cas certifi.where() sera utilisé.
- uamqp_transport
- bool
Indique s’il faut utiliser la bibliothèque uamqp comme transport sous-jacent. La valeur par défaut est False et la bibliothèque PURE PYTHON AMQP sera utilisée comme transport sous-jacent.
Type de retour
Exemples
Créez une instance de EventHubConsumerClient à partir de chaîne de connexion.
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Obtient les propriétés du hub d’événements.
Les clés dans le dictionnaire retourné sont les suivantes :
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
Retours
Dictionnaire contenant des informations sur le hub d’événements.
Type de retour
Exceptions
get_partition_ids
Obtenez les ID de partition de l’Event Hub.
async get_partition_ids() -> List[str]
Retours
Liste des ID de partition.
Type de retour
Exceptions
get_partition_properties
Obtient les propriétés de la partition spécifiée.
Les clés du dictionnaire de propriétés sont les suivantes :
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Paramètres
Retours
Dictionnaire contenant des propriétés de partition.
Type de retour
Exceptions
receive
Recevoir des événements de partitions, avec l’équilibrage de charge et les points de contrôle facultatifs.
async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Paramètres
- on_event
- Callable[PartitionContext, Optional[EventData]]
Fonction de rappel pour la gestion d’un événement reçu. Le rappel prend deux paramètres : partition_context qui contient le contexte de partition et l’événement qui est l’événement reçu. La fonction de rappel doit être définie comme suit : on_event(partition_context, événement) . Pour obtenir des informations détaillées sur le contexte de partition, reportez-vous à PartitionContext.
- max_wait_time
- float
Intervalle maximal en secondes que le processeur d’événements attend avant d’appeler le rappel. Si aucun événement n’est reçu dans cet intervalle, le rappel on_event est appelé avec Aucun. Si cette valeur est définie sur None ou 0 (valeur par défaut), le rappel n’est pas appelé tant qu’un événement n’est pas reçu.
- partition_id
- str
S’il est spécifié, le client reçoit uniquement de cette partition. Sinon, le client reçoit de toutes les partitions.
- owner_level
- int
Priorité pour un consommateur exclusif. Un consommateur exclusif est créé si owner_level est défini. Un consommateur avec une owner_level plus élevée a une priorité exclusive plus élevée. Le niveau propriétaire est également connu sous le nom de « valeur d’époque » du consommateur.
- prefetch
- int
Nombre d’événements à prérécupérer à partir du service pour traitement. La valeur par défaut est 300.
- track_last_enqueued_event_properties
- bool
Indique si le consommateur doit demander des informations sur l’événement en dernier file d’attente sur sa partition associée et suivre ces informations à mesure que les événements sont reçus. Lorsque des informations sur l’événement de dernière file d’attente des partitions sont suivies, chaque événement reçu du service Event Hubs transporte les métadonnées relatives à la partition. Il en résulte une petite quantité de bande passante réseau supplémentaire qui est généralement un compromis favorable lorsqu’il est considéré par rapport à l’envoi périodique de demandes de propriétés de partition à l’aide du client Event Hub. Elle est définie sur False par défaut.
Commencez à recevoir à partir de cette position d’événement s’il n’existe aucune donnée de point de contrôle pour une partition. Les données de point de contrôle seront utilisées si elles sont disponibles. Il peut s’agir d’un dicté avec l’ID de partition comme clé et la position comme valeur pour les partitions individuelles, ou une valeur unique pour toutes les partitions. Le type de valeur peut être str, int ou datetime.datetime. Les valeurs « -1 » pour la réception à partir du début du flux et « @latest » sont également prises en charge pour recevoir uniquement de nouveaux événements.
Déterminez si le starting_position donné est inclusif(>=) ou non (>). True pour inclusive et False pour exclusive. Il peut s’agir d’un dict avec l’ID de partition comme clé et bool comme valeur indiquant si la starting_position d’une partition spécifique est inclusive ou non. Il peut également s’agir d’une valeur bool unique pour tous les starting_position. La valeur par défaut est False.
- on_error
- Callable[[PartitionContext, Exception]]
Fonction de rappel qui sera appelée lorsqu’une erreur est déclenchée lors de la réception après l’épuisement des tentatives ou pendant le processus d’équilibrage de charge. Le rappel prend deux paramètres : partition_context qui contient des informations de partition et l’erreur étant l’exception. partition_context peut être None si l’erreur est générée pendant le processus d’équilibrage de charge. Le rappel doit être défini comme suit : on_error(partition_context, erreur) . Le rappel on_error est également appelé si une exception non gérée est levée pendant le rappel on_event .
- on_partition_initialize
- Callable[[PartitionContext]]
Fonction de rappel qui sera appelée après l’initialisation d’un consommateur pour une certaine partition. Elle est également appelée lorsqu’un nouveau consommateur de partition interne est créé pour prendre en charge le processus de réception d’un consommateur de partition interne ayant échoué et fermé. Le rappel prend un seul paramètre : partition_context qui contient les informations de partition. Le rappel doit être défini comme suit : on_partition_initialize(partition_context) .
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
Fonction de rappel qui sera appelée après la fermeture d’un consommateur pour une certaine partition. Elle est également appelée lorsque l’erreur est générée lors de la réception après l’épuisement des tentatives. Le rappel prend deux paramètres : partition_context qui contient des informations de partition et la raison de la fermeture. Le rappel doit être défini comme suit : on_partition_close(partition_context, raison) . Reportez-vous à CloseReason pour les différentes raisons de clôture.
Type de retour
Exemples
Recevez des événements à partir de l’EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
async with consumer:
await consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
receive_batch
Recevoir des événements de partitions par lots, avec l’équilibrage de charge et les points de contrôle facultatifs.
async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Paramètres
- on_event_batch
- Callable[PartitionContext, List[EventData]]
Fonction de rappel pour gérer un lot d’événements reçus. Le rappel prend deux paramètres : partition_context qui contient le contexte de partition et event_batch, qui est les événements reçus. La fonction de rappel doit être définie comme suit : on_event_batch(partition_context, event_batch) . event_batch peut être une liste vide si max_wait_time n’a pas la valeur None ni 0 et qu’aucun événement n’est reçu après max_wait_time. Pour obtenir des informations détaillées sur le contexte de partition, reportez-vous à PartitionContext.
- max_batch_size
- int
Nombre maximal d’événements dans un lot passé au rappel on_event_batch. Si le nombre réel d’événements reçus est supérieur à max_batch_size, les événements reçus sont divisés en lots et appellent le rappel pour chaque lot avec jusqu’à max_batch_size événements.
- max_wait_time
- float
Intervalle maximal en secondes que le processeur d’événements attend avant d’appeler le rappel. Si aucun événement n’est reçu dans cet intervalle, le rappel on_event_batch est appelé avec une liste vide. Si cette valeur est définie sur None ou 0 (valeur par défaut), le rappel n’est pas appelé tant que les événements ne sont pas reçus.
- partition_id
- str
S’il est spécifié, le client reçoit uniquement de cette partition. Sinon, le client reçoit de toutes les partitions.
- owner_level
- int
Priorité pour un consommateur exclusif. Un consommateur exclusif est créé si owner_level est défini. Un consommateur avec une owner_level plus élevée a une priorité exclusive plus élevée. Le niveau propriétaire est également connu sous le nom de « valeur d’époque » du consommateur.
- prefetch
- int
Nombre d’événements à prérécupérer à partir du service pour traitement. La valeur par défaut est 300.
- track_last_enqueued_event_properties
- bool
Indique si le consommateur doit demander des informations sur l’événement en dernier file d’attente sur sa partition associée et suivre ces informations à mesure que les événements sont reçus. Lorsque des informations sur l’événement de dernière file d’attente des partitions sont suivies, chaque événement reçu du service Event Hubs transporte les métadonnées relatives à la partition. Il en résulte une petite quantité de bande passante réseau supplémentaire qui est généralement un compromis favorable lorsqu’il est considéré par rapport à l’envoi périodique de demandes de propriétés de partition à l’aide du client Event Hub. Elle est définie sur False par défaut.
Commencez à recevoir à partir de cette position d’événement s’il n’existe aucune donnée de point de contrôle pour une partition. Les données de point de contrôle seront utilisées si elles sont disponibles. Il peut s’agir d’un dicté avec l’ID de partition comme clé et la position comme valeur pour les partitions individuelles, ou une valeur unique pour toutes les partitions. Le type de valeur peut être str, int ou datetime.datetime. Les valeurs « -1 » pour la réception à partir du début du flux et « @latest » sont également prises en charge pour recevoir uniquement de nouveaux événements.
Déterminez si le starting_position donné est inclusif(>=) ou non (>). True pour inclusive et False pour exclusive. Il peut s’agir d’un dict avec l’ID de partition comme clé et bool comme valeur indiquant si la starting_position d’une partition spécifique est inclusive ou non. Il peut également s’agir d’une valeur bool unique pour tous les starting_position. La valeur par défaut est False.
- on_error
- Callable[[PartitionContext, Exception]]
Fonction de rappel qui sera appelée lorsqu’une erreur est déclenchée lors de la réception après l’épuisement des tentatives ou pendant le processus d’équilibrage de charge. Le rappel prend deux paramètres : partition_context qui contient des informations de partition et l’erreur étant l’exception. partition_context peut être None si l’erreur est générée pendant le processus d’équilibrage de charge. Le rappel doit être défini comme suit : on_error(partition_context, erreur) . Le rappel on_error est également appelé si une exception non gérée est levée pendant le rappel on_event .
- on_partition_initialize
- Callable[[PartitionContext]]
Fonction de rappel qui sera appelée après l’initialisation d’un consommateur pour une certaine partition. Elle est également appelée lorsqu’un nouveau consommateur de partition interne est créé pour prendre en charge le processus de réception d’un consommateur de partition interne ayant échoué et fermé. Le rappel prend un seul paramètre : partition_context qui contient les informations de partition. Le rappel doit être défini comme suit : on_partition_initialize(partition_context) .
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
Fonction de rappel qui sera appelée après la fermeture d’un consommateur pour une certaine partition. Elle est également appelée lorsque l’erreur est générée lors de la réception après l’épuisement des tentatives. Le rappel prend deux paramètres : partition_context qui contient des informations de partition et la raison de la fermeture. Le rappel doit être défini comme suit : on_partition_close(partition_context, raison) . Reportez-vous à CloseReason pour les différentes raisons de clôture.
Type de retour
Exemples
Recevez des événements par lots à partir de l’EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info(
"{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
Azure SDK for Python