Sdílet prostřednictvím


EventHubProducerClient Třída

Třída EventHubProducerClient definuje rozhraní vysoké úrovně pro odesílání událostí do služby Azure Event Hubs.

Dědičnost
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubProducerClient

Konstruktor

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)

Parametry

fully_qualified_namespace
str
Vyžadováno

Plně kvalifikovaný název hostitele pro obor názvů služby Event Hubs. Pravděpodobně se bude podobat souboru .servicebus.windows.net

eventhub_name
str
Vyžadováno

Cesta ke konkrétnímu centru událostí, ke kterému se má klient připojit.

credential
AsyncTokenCredential nebo AzureSasCredential nebo AzureNamedKeyCredential
Vyžadováno

Objekt přihlašovacích údajů používaný k ověřování, který implementuje konkrétní rozhraní pro získávání tokenů. Přijímá EventHubSharedKeyCredentialobjekty přihlašovacích údajů nebo vygenerované knihovnou azure-identity a objekty, které implementují metodu *get_token(self, scopes).

buffered_mode
bool

Pokud je true, klient producenta bude shromažďovat události ve vyrovnávací paměti, efektivně dávkově a pak publikovat. Výchozí hodnota je False.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Zpětné volání, které se má volat po úspěšném publikování dávky. Zpětné volání má dva parametry:

  • events: Seznam událostí, které byly úspěšně publikovány.

  • partition_id: ID oddílu, do kterého byly publikovány události v seznamu.

Funkce zpětného volání by měla být definována takto: on_success(events, partition_id). Vyžaduje se , pokud má buffered_mode hodnotu True, zatímco volitelná, pokud má buffered_mode hodnotu False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Zpětné volání, které se má volat, jakmile se nepodaří publikovat dávku. Vyžaduje se, pokud je hodnota v buffered_mode true, zatímco volitelná, pokud buffered_mode má hodnotu False. Funkce zpětného volání by měla být definována takto: on_error(události, partition_id, chyba), kde:

  • events: Seznam událostí, které se nepodařilo publikovat,

  • partition_id: ID oddílu, do kterého se události v seznamu pokusily publikovat, a

  • error: Výjimka související se selháním odesílání.

Pokud má buffered_mode hodnotu False, on_error zpětné volání je volitelné a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se během vytváření instance klienta producenta předává zpětné volání on_error ,

    informace o chybě budou předány zpětnému volání on_error , které se pak bude volat.

  • Pokud se během vytváření instance klienta nepředá zpětné volání on_error,

    pak bude ve výchozím nastavení vyvolána chyba.

Pokud má buffered_mode hodnotu True, vyžaduje se zpětné volání on_error a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se události nepodaří zařadit do fronty v rámci daného časového limitu, dojde přímo k chybě.

  • Pokud se události po úspěšném zařazení do fronty nepodaří odeslat, bude volána zpětná volání on_error .

max_buffer_length
int

Pouze režim vyrovnávací paměti. Celkový počet událostí na oddíl, které je možné ukládat do vyrovnávací paměti před aktivací vyprázdnění. Výchozí hodnota je 1500 v režimu vyrovnávací paměti.

max_wait_time
Optional[float]

Pouze režim vyrovnávací paměti. Doba čekání na sestavení dávky s událostmi ve vyrovnávací paměti před publikováním. Výchozí hodnota je 1 v režimu vyrovnávací paměti.

logging_enable
bool

Určuje, zda se mají protokoly trasování sítě vypisovat do protokolovacího nástroje. Výchozí hodnota je False.

auth_timeout
float

Doba v sekundách čekání na autorizaci tokenu službou. Výchozí hodnota je 60 sekund. Pokud je nastavená hodnota 0, nebude se z klienta vynucovat žádný časový limit.

user_agent
str

Pokud je zadaný, přidá se před řetězec uživatelského agenta.

retry_total
int

Celkový počet pokusů o opakování neúspěšné operace, když dojde k chybě. Výchozí hodnota je 3.

retry_backoff_factor
float

Faktor zpomalování, který se použije mezi pokusy po druhém pokusu (většina chyb se vyřeší okamžitě druhým pokusem bez zpoždění). V pevném režimu zásady opakování vždy přejdou do režimu spánku pro {backoff factor}. V exponenciálním režimu budou zásady opakování v režimu spánku po dobu: {backoff factor} * (2 ** ({počet celkového počtu opakování} – 1)) sekund. Pokud je backoff_factor 0,1, pak bude opakování mezi opakováními [0,0 s, 0,2 s, 0,4 s, ...] v režimu spánku. Výchozí hodnota je 0,8.

retry_backoff_max
float

Maximální doba návratu. Výchozí hodnota je 120 sekund (2 minuty).

retry_mode
str

Chování zpoždění mezi opakovanými pokusy. Podporované hodnoty jsou "pevné" nebo "exponenciální", kde výchozí hodnota je exponenciální.

idle_timeout
float

Časový limit v sekundách, po jehož uplynutí klient ukončí základní připojení, pokud nedojde k žádné aktivitě. Ve výchozím nastavení je hodnota None, což znamená, že klient se z důvodu nečinnosti nevystaví, dokud ho služba neaktivuje.

transport_type
TransportType

Typ přenosového protokolu, který se použije ke komunikaci se službou Event Hubs. Výchozí hodnota je TransportType.Amqp , v takovém případě se použije port 5671. Pokud je port 5671 v síťovém prostředí nedostupný nebo blokovaný, je možné místo toho použít transportType.AmqpOverWebsocket , který ke komunikaci používá port 443.

http_proxy
dict

Nastavení proxy serveru HTTP. Musí se jednat o slovník s následujícími klíči: "proxy_hostname" (hodnota str) a "proxy_port" (hodnota int). Kromě toho mohou být k dispozici také následující klíče: 'username', 'password'.

custom_endpoint_address
Optional[str]

Vlastní adresa koncového bodu, která se má použít k navázání připojení ke službě Event Hubs, což umožňuje směrování síťových požadavků přes všechny aplikační brány nebo jiné cesty potřebné pro hostitelské prostředí. Výchozí hodnota je Žádná. Formát by vypadal takto: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Pokud není port v custom_endpoint_address zadaný, použije se ve výchozím nastavení port 443.

connection_verify
Optional[str]

Cesta k souboru vlastního CA_BUNDLE certifikátu SSL, který slouží k ověření identity koncového bodu připojení. Výchozí hodnota je None, v takovém případě se použije certifi.where().

uamqp_transport
bool

Určuje, jestli se má jako podkladový přenos použít knihovna uamqp . Výchozí hodnota je False a jako podkladový přenos se použije knihovna AMQP pure Pythonu.

socket_timeout
float

Čas v sekundách, kdy by měl podkladový soket na připojení čekat při odesílání a přijímání dat před vypršením časového limitu. Výchozí hodnota je 0,2 pro TransportType.Amqp a 1 pro TransportType.AmqpOverWebsocket. Pokud k chybám EventHubsConnectionError dochází kvůli vypršení časového limitu zápisu, může být potřeba předat větší než výchozí hodnotu. To je pro pokročilé scénáře použití a obvykle by měla stačit výchozí hodnota.

Příklady

Vytvořte novou instanci EventHubProducerClient.


   import os
   from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Metody

close

Zavřete základní připojení a propojení AMQP klienta producenta.

create_batch

Vytvořte objekt EventDataBatch s maximální velikostí veškerého obsahu, který je omezen max_size_in_bytes.

Max_size_in_bytes by neměla být větší než maximální povolená velikost zprávy definovaná službou.

flush

Pouze režim vyrovnávací paměti. Vyprázdnění událostí ve vyrovnávací paměti, které se mají okamžitě odeslat, pokud klient pracuje v režimu vyrovnávací paměti.

from_connection_string

Vytvořte EventHubProducerClient z připojovací řetězec.

get_buffered_event_count

Počet událostí, které jsou v vyrovnávací paměti a čekají na publikování pro daný oddíl. Vrátí hodnotu None v režimu bez vyrovnávací paměti. POZNÁMKA: Vyrovnávací paměť událostí se zpracovává v korutině na pozadí, proto počet událostí ve vyrovnávací paměti hlášený tímto rozhraním API by měl být považován pouze za aproximaci a doporučuje se pouze pro použití při ladění. Pro ID oddílu, který nemá žádné události ve vyrovnávací paměti, se vrátí hodnota 0 bez ohledu na to, jestli TOTO ID oddílu v centru událostí skutečně existuje.

get_eventhub_properties

Získejte vlastnosti centra událostí.

Mezi klíče ve vráceném slovníku patří:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Získejte ID oddílů centra událostí.

get_partition_properties

Získá vlastnosti zadaného oddílu.

Mezi klíče ve slovníku vlastností patří:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

Odešle dávku dat událostí. Ve výchozím nastavení bude metoda blokovat, dokud nedojde k potvrzení nebo dokud nevypadne časový limit operace. Pokud je EventHubProducerClient nakonfigurovaný tak, aby běžel v režimu vyrovnávací paměti, metoda zařadí události do místní vyrovnávací paměti a vrátí je. Producent provede automatické odesílání na pozadí.

Pokud je buffered_mode False, on_error zpětné volání je volitelné a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se během vytváření instancí klienta producenta předá zpětné volání on_error ,

    pak se informace o chybě předají zpětnému volání on_error , které se pak zavolá.

  • Pokud se během vytváření instancí klienta nepředá zpětné volání on_error,

    pak bude ve výchozím nastavení vyvolána chyba.

Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se událostem nepodaří vytvořit frontu v daném časovém limitu, bude přímo vyvolána chyba.

  • Pokud se události po úspěšném zařazení do fronty nepodaří odeslat, bude volána zpětná volání on_error .

V režimu vyrovnávací paměti zůstane odeslání dávky nedotčené a odesláno jako jedna jednotka. Uspořádání dávky nebude přeuspořádané. To může vést k neefektivitě odesílání událostí.

Pokud odesíláte konečný seznam EventData nebo AmqpAnnotatedMessage a víte, že je v limitu velikosti rámce centra událostí, můžete je poslat s send_batch voláním. V opačném případě použijte create_batch k vytvoření EventDataBatch a přidejte buď EventData nebo AmqpAnnotatedMessage do dávky jeden po druhém až do limitu velikosti, a pak voláním této metody odešlete dávku.

send_event

Odešle data události. Ve výchozím nastavení bude metoda blokovat, dokud nedojde k potvrzení nebo dokud nevypadne časový limit operace. Pokud je EventHubProducerClient nakonfigurovaný tak, aby běžel v režimu vyrovnávací paměti, metoda zařadí událost do místní vyrovnávací paměti a vrátí ji. Producent provede automatické dávkování a odeslání na pozadí.

Pokud je buffered_mode false, on_error zpětné volání je volitelné a chyby se budou zpracovávat takto: * Pokud se během vytváření instance klienta producenta předá zpětné volání on_error ,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem: * Pokud se událostem nepodaří vytvořit frontu v daném časovém limitu, bude přímo vyvolána chyba.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Zavřete základní připojení a propojení AMQP klienta producenta.

async close(*, flush: bool = True, **kwargs: Any) -> None

Parametry

flush
bool

Pouze režim vyrovnávací paměti. Pokud je nastavená hodnota True, události ve vyrovnávací paměti se odešlou okamžitě. Výchozí hodnota je True.

timeout
float nebo None

Pouze režim vyrovnávací paměti. Vypršení časového limitu pro zavření producenta. Výchozí hodnota je Žádná, což znamená žádný časový limit.

Návratový typ

Výjimky

Pokud došlo k chybě při vyprazdňování vyrovnávací paměti, pokud je vyprázdnění nastaveno na true nebo ukončení podkladových připojení AMQP v režimu vyrovnávací paměti.

Příklady

Zavřete obslužnou rutinu.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   from azure.eventhub import EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break
       await producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       await producer.close()

create_batch

Vytvořte objekt EventDataBatch s maximální velikostí veškerého obsahu, který je omezen max_size_in_bytes.

Max_size_in_bytes by neměla být větší než maximální povolená velikost zprávy definovaná službou.

async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch

Návratový typ

Výjimky

Pokud došlo k chybě při vyprazdňování vyrovnávací paměti, pokud je vyprázdnění nastaveno na true nebo ukončení podkladových připojení AMQP v režimu vyrovnávací paměti.

Příklady

Vytvoření objektu EventDataBatch v omezené velikosti


       from azure.eventhub import EventData
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Pouze režim vyrovnávací paměti. Vyprázdnění událostí ve vyrovnávací paměti, které se mají okamžitě odeslat, pokud klient pracuje v režimu vyrovnávací paměti.

async flush(**kwargs: Any) -> None

Parametry

timeout
float nebo None

Vypršení časového limitu pro vyprázdnění událostí ve vyrovnávací paměti, výchozí hodnota je Žádná, což znamená žádný časový limit.

Návratový typ

Výjimky

Pokud se producentovi nepodaří vyprázdnit vyrovnávací paměť v rámci daného časového limitu v režimu vyrovnávací paměti.

from_connection_string

Vytvořte EventHubProducerClient z připojovací řetězec.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient

Parametry

conn_str
str
Vyžadováno

Připojovací řetězec centra událostí.

eventhub_name
str

Cesta ke konkrétnímu centru událostí, ke kterému se má klient připojit.

buffered_mode
bool

Pokud je true, klient producenta bude shromažďovat události ve vyrovnávací paměti, efektivně dávkově a pak publikovat. Výchozí hodnota je False.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Zpětné volání, které se má volat po úspěšném publikování dávky. Zpětné volání má dva parametry:

  • events: Seznam událostí, které byly úspěšně publikovány.

  • partition_id: ID oddílu, do kterého byly publikovány události v seznamu.

Funkce zpětného volání by měla být definována takto: on_success(events, partition_id). Vyžaduje se, pokud má buffered_mode hodnotu True, zatímco volitelná, pokud má buffered_mode hodnotu False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Zpětné volání, které se má volat, jakmile se nepodaří publikovat dávku. Funkce zpětného volání by měla být definována takto: on_error(události, partition_id, chyba), kde:

  • events: Seznam událostí, které se nepodařilo publikovat,

  • partition_id: ID oddílu, do kterého se události v seznamu pokusily publikovat, a

  • error: Výjimka související se selháním odesílání.

Pokud má buffered_mode hodnotu False, on_error zpětné volání je volitelné a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se během vytváření instance klienta producenta předává zpětné volání on_error ,

    informace o chybě budou předány zpětnému volání on_error , které se pak bude volat.

  • Pokud se během vytváření instance klienta nepředá zpětné volání on_error,

    pak bude ve výchozím nastavení vyvolána chyba.

Pokud má buffered_mode hodnotu True, vyžaduje se zpětné volání on_error a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se události nepodaří zařadit do fronty v rámci daného časového limitu, dojde přímo k chybě.

  • Pokud se události po úspěšném zařazení do fronty nepodaří odeslat, bude volána zpětná volání on_error .

max_buffer_length
int

Pouze režim vyrovnávací paměti. Celkový počet událostí na oddíl, které je možné ukládat do vyrovnávací paměti před aktivací vyprázdnění. Výchozí hodnota je 1500 v režimu vyrovnávací paměti.

max_wait_time
Optional[float]

Pouze režim vyrovnávací paměti. Doba čekání na sestavení dávky s událostmi ve vyrovnávací paměti před publikováním. Výchozí hodnota je 1 v režimu vyrovnávací paměti.

logging_enable
bool

Určuje, zda se mají protokoly trasování sítě vypisovat do protokolovacího nástroje. Výchozí hodnota je False.

http_proxy
dict

Nastavení proxy serveru HTTP. Musí se jednat o slovník s následujícími klíči: "proxy_hostname" (hodnota str) a "proxy_port" (hodnota int). Kromě toho mohou být k dispozici také následující klíče: 'username', 'password'.

auth_timeout
float

Doba v sekundách čekání na autorizaci tokenu službou. Výchozí hodnota je 60 sekund. Pokud je nastavená hodnota 0, nebude se z klienta vynucovat žádný časový limit.

user_agent
str

Pokud je zadaný, přidá se před řetězec uživatelského agenta.

retry_total
int

Celkový počet pokusů o opakování neúspěšné operace, když dojde k chybě. Výchozí hodnota je 3.

retry_backoff_factor
float

Faktor zpomalování, který se použije mezi pokusy po druhém pokusu (většina chyb se vyřeší okamžitě druhým pokusem bez zpoždění). V pevném režimu zásady opakování vždy přejdou do režimu spánku pro {backoff factor}. V exponenciálním režimu budou zásady opakování v režimu spánku po dobu: {backoff factor} * (2 ** ({počet celkového počtu opakování} – 1)) sekund. Pokud je backoff_factor 0,1, pak bude opakování mezi opakováními [0,0 s, 0,2 s, 0,4 s, ...] v režimu spánku. Výchozí hodnota je 0,8.

retry_backoff_max
float

Maximální doba návratu. Výchozí hodnota je 120 sekund (2 minuty).

retry_mode
str

Chování zpoždění mezi opakovanými pokusy. Podporované hodnoty jsou "pevné" nebo "exponenciální", kde výchozí hodnota je exponenciální.

idle_timeout
float

Časový limit v sekundách, po jehož uplynutí klient ukončí základní připojení, pokud nedojde k žádné aktivitě. Ve výchozím nastavení je hodnota None, což znamená, že klient se z důvodu nečinnosti nevystaví, dokud ho služba neaktivuje.

transport_type
TransportType

Typ přenosového protokolu, který se použije ke komunikaci se službou Event Hubs. Výchozí hodnota je TransportType.Amqp , v takovém případě se použije port 5671. Pokud je port 5671 v síťovém prostředí nedostupný nebo blokovaný, je možné místo toho použít transportType.AmqpOverWebsocket , který ke komunikaci používá port 443.

custom_endpoint_address
Optional[str]

Vlastní adresa koncového bodu, která se má použít k navázání připojení ke službě Event Hubs, což umožňuje směrování síťových požadavků přes všechny aplikační brány nebo jiné cesty potřebné pro hostitelské prostředí. Výchozí hodnota je Žádná. Formát by vypadal takto: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Pokud není port v custom_endpoint_address zadaný, použije se ve výchozím nastavení port 443.

connection_verify
Optional[str]

Cesta k souboru vlastního CA_BUNDLE certifikátu SSL, který slouží k ověření identity koncového bodu připojení. Výchozí hodnota je None, v takovém případě se použije certifi.where().

uamqp_transport
bool

Určuje, jestli se má jako podkladový přenos použít knihovna uamqp . Výchozí hodnota je False a jako podkladový přenos se použije knihovna AMQP pure Pythonu.

Návratový typ

Výjimky

Pokud došlo k chybě při vyprazdňování vyrovnávací paměti, pokud je vyprázdnění nastaveno na true nebo ukončení podkladových připojení AMQP v režimu vyrovnávací paměti.

Příklady

Z připojovací řetězec vytvořte novou instanci EventHubProducerClient.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

Počet událostí, které jsou v vyrovnávací paměti a čekají na publikování pro daný oddíl. Vrátí hodnotu None v režimu bez vyrovnávací paměti. POZNÁMKA: Vyrovnávací paměť událostí se zpracovává v korutině na pozadí, proto počet událostí ve vyrovnávací paměti hlášený tímto rozhraním API by měl být považován pouze za aproximaci a doporučuje se pouze pro použití při ladění. Pro ID oddílu, který nemá žádné události ve vyrovnávací paměti, se vrátí hodnota 0 bez ohledu na to, jestli TOTO ID oddílu v centru událostí skutečně existuje.

get_buffered_event_count(partition_id: str) -> int | None

Parametry

partition_id
str
Vyžadováno

ID cílového oddílu.

Návratový typ

int,

Výjimky

Pokud došlo k chybě při vyprazdňování vyrovnávací paměti, pokud je vyprázdnění nastaveno na true nebo ukončení podkladových připojení AMQP v režimu vyrovnávací paměti.

get_eventhub_properties

Získejte vlastnosti centra událostí.

Mezi klíče ve vráceném slovníku patří:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Návraty

Slovník obsahující informace o centru událostí.

Návratový typ

Výjimky

get_partition_ids

Získejte ID oddílů centra událostí.

async get_partition_ids() -> List[str]

Návraty

Seznam ID oddílů.

Návratový typ

Výjimky

get_partition_properties

Získá vlastnosti zadaného oddílu.

Mezi klíče ve slovníku vlastností patří:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametry

partition_id
str
Vyžadováno

ID cílového oddílu.

Návraty

Dict vlastností oddílu.

Návratový typ

Výjimky

send_batch

Odešle dávku dat událostí. Ve výchozím nastavení bude metoda blokovat, dokud nedojde k potvrzení nebo dokud nevypadne časový limit operace. Pokud je EventHubProducerClient nakonfigurovaný tak, aby běžel v režimu vyrovnávací paměti, metoda zařadí události do místní vyrovnávací paměti a vrátí je. Producent provede automatické odesílání na pozadí.

Pokud je buffered_mode False, on_error zpětné volání je volitelné a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se během vytváření instancí klienta producenta předá zpětné volání on_error ,

    pak se informace o chybě předají zpětnému volání on_error , které se pak zavolá.

  • Pokud se během vytváření instancí klienta nepředá zpětné volání on_error,

    pak bude ve výchozím nastavení vyvolána chyba.

Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se událostem nepodaří vytvořit frontu v daném časovém limitu, bude přímo vyvolána chyba.

  • Pokud se události po úspěšném zařazení do fronty nepodaří odeslat, bude volána zpětná volání on_error .

V režimu vyrovnávací paměti zůstane odeslání dávky nedotčené a odesláno jako jedna jednotka. Uspořádání dávky nebude přeuspořádané. To může vést k neefektivitě odesílání událostí.

Pokud odesíláte konečný seznam EventData nebo AmqpAnnotatedMessage a víte, že je v limitu velikosti rámce centra událostí, můžete je poslat s send_batch voláním. V opačném případě použijte create_batch k vytvoření EventDataBatch a přidejte buď EventData nebo AmqpAnnotatedMessage do dávky jeden po druhém až do limitu velikosti, a pak voláním této metody odešlete dávku.

async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Parametry

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Vyžadováno

EventDataBatch objekt, který se má odeslat, nebo seznam EventData, který se má odeslat v dávce. Všechny události EventData nebo AmqpAnnotatedMessage v seznamu nebo EventDataBatch přijdou na stejný oddíl.

timeout
float

Maximální doba čekání na odeslání dat události v režimu bez vyrovnávací paměti nebo maximální doba čekání na zařazení dat události do vyrovnávací paměti v režimu vyrovnávací paměti. V režimu bez vyrovnávací paměti se použije výchozí doba čekání zadaná při vytvoření producenta. V režimu vyrovnávací paměti je výchozí doba čekání Žádná.

partition_id
str

ID konkrétního oddílu, do který se má odeslat. Výchozí hodnota je None (Žádný). V takovém případě se služba přiřadí ke všem oddílům pomocí kruhového dotazování. Pokud zadáte partition_id a event_data_batch je EventDataBatch, vyvolá se chyba TypeError, protože samotná událost EventDataBatch má partition_id.

partition_key
str

S danou partition_key se data událostí odesílají do konkrétního oddílu centra událostí, o které rozhoduje služba. Pokud zadáte partition_key a event_data_batch je EventDataBatch, vyvolá se chyba TypeError, protože samotná událost EventDataBatch má partition_key. Pokud jsou k dispozici partition_id i partition_key, bude mít přednost partition_id. UPOZORNĚNÍ: Nastavení partition_key neřetězcové hodnoty u odeslaných událostí se nedoporučuje, protože služba centra událostí bude partition_key ignorovat a události budou přiřazeny ke všem oddílům pomocí kruhového dotazování. Kromě toho existují sady SDK pro využívání událostí, které očekávají, že partition_key budou pouze typu řetězec. Nemusí se jim podařit analyzovat neřetězcovou hodnotu.

Návratový typ

Výjimky

Pokud hodnota určená parametrem časového limitu uplynou před událostí může být odeslána v režimu bez vyrovnávací paměti nebo události mohou být zařazení do vyrovnávací paměti v režimu vyrovnávací paměti.

Příklady

Asynchronně odesílá data událostí.


       async with producer:
           event_data_batch = await producer.create_batch()
           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # The EventDataBatch object reaches its max_size.
                   # You can send the full EventDataBatch object and create a new one here.
                   break
           await producer.send_batch(event_data_batch)

send_event

Odešle data události. Ve výchozím nastavení bude metoda blokovat, dokud nedojde k potvrzení nebo dokud nevypadne časový limit operace. Pokud je EventHubProducerClient nakonfigurovaný tak, aby běžel v režimu vyrovnávací paměti, metoda zařadí událost do místní vyrovnávací paměti a vrátí ji. Producent provede automatické dávkování a odeslání na pozadí.

Pokud je buffered_mode false, on_error zpětné volání je volitelné a chyby se budou zpracovávat takto: * Pokud se během vytváření instance klienta producenta předá zpětné volání on_error ,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem: * Pokud se událostem nepodaří vytvořit frontu v daném časovém limitu, bude přímo vyvolána chyba.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Parametry

event_data
Union[EventData, AmqpAnnotatedMessage]
Vyžadováno

Objekt EventData , který se má odeslat.

timeout
float

Maximální doba čekání na odeslání dat události v režimu bez vyrovnávací paměti nebo maximální doba čekání na zařazení dat události do vyrovnávací paměti v režimu vyrovnávací paměti. V režimu bez vyrovnávací paměti se použije výchozí doba čekání zadaná při vytvoření producenta. Ve vyrovnávacím režimu je výchozí doba čekání Žádná.

partition_id
str

KONKRÉTNÍ ID oddílu, do které se má odeslat. Výchozí hodnota je Žádný. V takovém případě služba přiřadí všechny oddíly pomocí kruhového dotazování. Pokud je zadána partition_id a event_data_batch je EventDataBatch, vyvolá se chyba TypeError, protože samotná hodnota EventDataBatch má partition_id.

partition_key
str

S daným partition_key se data událostí odesílají do konkrétního oddílu centra událostí, o které rozhodne služba. Pokud je zadána partition_key a event_data_batch je EventDataBatch, vyvolá se chyba TypeError, protože samotná hodnota EventDataBatch má partition_key. Pokud jsou k dispozici partition_id i partition_key, bude mít přednost partition_id. UPOZORNĚNÍ: Nastavení partition_key neřetězcové hodnoty u odeslaných událostí se nedoporučuje, protože služba centra událostí bude partition_key ignorovat a události budou přiřazeny ke všem oddílům pomocí kruhového dotazování. Kromě toho existují sady SDK pro využívání událostí, které očekávají, že partition_key budou pouze typu řetězec. Nemusí se jim podařit analyzovat neřetězcovou hodnotu.

Návratový typ

Výjimky

Pokud hodnota určená parametrem časového limitu uplynou před odesláním události v režimu bez vyrovnávací paměti nebo události nelze zařadit do vyrovnávací paměti v režimu vyrovnávací paměti.

Atributy

total_buffered_event_count

Celkový počet událostí, které jsou aktuálně ve všech oddílech ve vyrovnávací paměti a čekají na publikování. Vrátí hodnotu None v režimu bez vyrovnávací paměti. POZNÁMKA: Vyrovnávací paměť událostí se zpracovává v korutině na pozadí, proto počet událostí ve vyrovnávací paměti hlášený tímto rozhraním API by měl být považován pouze za aproximaci a doporučuje se pouze pro použití při ladění.

Návratový typ

int,