EventHubProducerClient Klasa

Klasa EventHubProducerClient definiuje interfejs wysokiego poziomu do wysyłania zdarzeń do usługi Azure Event Hubs.

Dziedziczenie
azure.eventhub._client_base.ClientBase
EventHubProducerClient

Konstruktor

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: Literal[False] = False, **kwargs: Any)

Parametry

fully_qualified_namespace
str
Wymagane

W pełni kwalifikowana nazwa hosta dla przestrzeni nazw usługi Event Hubs. Prawdopodobnie będzie to podobne do .servicebus.windows.net

eventhub_name
str
Wymagane

Ścieżka określonego centrum zdarzeń do połączenia klienta.

credential
TokenCredential lub AzureSasCredential lub AzureNamedKeyCredential
Wymagane

Obiekt poświadczeń używany do uwierzytelniania, który implementuje określony interfejs do pobierania tokenów. Akceptuje EventHubSharedKeyCredentialobiekty , lub poświadczenia generowane przez bibliotekę tożsamości platformy Azure i obiekty implementujące metodę *get_token(self, scopes).

buffered_mode
bool

Jeśli wartość True, klient producenta będzie zbierać zdarzenia w buforze, efektywnie wsadowy, a następnie publikować. Wartość domyślna to Fałsz.

buffer_concurrency
<xref:ThreadPoolExecutor> lub int lub None

Element ThreadPoolExecutor, który ma być używany do publikowania zdarzeń lub liczby procesów roboczych dla klasy ThreadPoolExecutor. Wartość domyślna to Brak, a element ThreadPoolExecutor z domyślną liczbą procesów roboczych zostanie utworzony na https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

Wywołanie zwrotne, które ma być wywoływane po pomyślnym opublikowaniu partii. Wywołanie zwrotne przyjmuje dwa parametry:

  • zdarzenia: lista zdarzeń, które zostały pomyślnie opublikowane

  • partition_id: identyfikator partycji, do którego zostały opublikowane zdarzenia na liście.

Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_success(events, partition_id). Jest to wymagane, gdy buffered_mode ma wartość True, jeśli buffered_mode ma wartość False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

Wywołanie zwrotne, które ma być wywoływane po opublikowaniu partii. Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_error(events, partition_id, error), gdzie:

  • zdarzenia: lista zdarzeń, których nie można opublikować,

  • partition_id: identyfikator partycji, do którego podjęto próbę opublikowania zdarzeń na liście i

  • błąd: wyjątek związany z niepowodzeniem wysyłania.

Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:

  • Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,

    następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, które następnie zostaną wywołane.

  • Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,

    następnie domyślnie zostanie zgłoszony błąd.

Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:

  • Jeśli zdarzenia nie będą w kolejce w danym przekroczeniu limitu czasu, zostanie zgłoszony błąd bezpośrednio.

  • Jeśli zdarzenia nie będą wysyłane po pomyślnym w kolejce, wywołanie zwrotne on_error zostanie wywołane.

max_buffer_length
int

Tylko tryb buforowany. Łączna liczba zdarzeń na partycję, które można buforować przed wyzwoleniem opróżnienia. Wartość domyślna to 1500 w trybie buforowym.

max_wait_time
Optional[float]

Tylko tryb buforowany. Czas oczekiwania na utworzenie partii z zdarzeniami w buforze przed opublikowaniem. Wartość domyślna to 1 w trybie buforowym.

logging_enable
bool

Czy dane wyjściowe dzienniki śledzenia sieci do rejestratora. Wartość domyślna to Fałsz.

auth_timeout
float

Czas w sekundach oczekiwania na autoryzację tokenu przez usługę. Wartość domyślna to 60 sekund. Jeśli ustawiono wartość 0, nie zostanie wymuszony limit czasu od klienta.

user_agent
str

Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.

retry_total
int

Całkowita liczba prób ponownego wykonania operacji, która zakończyła się niepowodzeniem po wystąpieniu błędu. Wartość domyślna to 3.

retry_backoff_factor
float

Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnienia). W trybie stałym zasady ponawiania będą zawsze w stanie uśpienia dla elementu {backoff factor}. W trybie wykładniczym zasady ponawiania prób uśpią następujące elementy: {backoff factor} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie próby będzie spać dla [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.

retry_backoff_max
float

Maksymalny czas wolny od pracy. Wartość domyślna to 120 sekund (2 minuty).

retry_mode
str

Opóźnienie między próbami ponawiania prób. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".

idle_timeout
float

Przekroczenie limitu czasu w sekundach, po którym ten klient zamknie połączenie bazowe, jeśli nie istnieje żadne działanie. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.

transport_type
TransportType

Typ protokołu transportu, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.

http_proxy
Dict

Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość str) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".

custom_endpoint_address
Optional[str]

Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiając kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie taki jak "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie zostanie użyty port 443.

connection_verify
Optional[str]

Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to None w tym przypadku certifi.where() będzie używany.

uamqp_transport
bool

Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.

socket_timeout
float

Czas w sekundach, w którym bazowe gniazdo w połączeniu powinno czekać podczas wysyłania i odbierania danych przed przekroczeniem limitu czasu. Wartość domyślna to 0,2 dla elementu TransportType.Amqp i 1 dla elementu TransportType.AmqpOverWebsocket. Jeśli występują błędy EventHubsConnectionError z powodu limitu czasu zapisu, może być konieczne przekazanie większej niż wartość domyślna. Jest to przeznaczone dla zaawansowanych scenariuszy użycia i zwykle wartość domyślna powinna być wystarczająca.

Przykłady

Utwórz nowe wystąpienie klasy EventHubProducerClient.


   import os
   from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   producer = EventHubProducerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,  # EventHub name should be specified if it doesn't show up in connection string.
       credential=credential
   )

Metody

close

Zamknij klienta producenta bazowego połączenia AMQP i linków.

create_batch

Utwórz obiekt EventDataBatch o maksymalnym rozmiarze całej zawartości ograniczanej przez max_size_in_bytes.

Max_size_in_bytes nie powinna być większa niż maksymalny dozwolony rozmiar komunikatu zdefiniowany przez usługę.

flush

Tylko tryb buforowany. Opróżnij zdarzenia w buforze, które mają być wysyłane natychmiast, jeśli klient działa w trybie buforowym.

from_connection_string

Utwórz element EventHubProducerClient na podstawie parametry połączenia.

get_buffered_event_count

Liczba zdarzeń, które są buforowane i oczekujące na opublikowanie dla danej partycji. Zwraca wartość None w trybie bez buforowania. UWAGA: Bufor zdarzeń jest przetwarzany w wątku w tle, dlatego liczba zdarzeń w buforze zgłoszonym przez ten interfejs API powinna być uznawana tylko za przybliżenie i jest zalecana tylko do użycia w debugowaniu. W przypadku identyfikatora partycji, który nie ma zdarzeń buforowanych, 0 zostanie zwrócone niezależnie od tego, czy ten identyfikator partycji rzeczywiście istnieje w centrum zdarzeń.

get_eventhub_properties

Pobieranie właściwości centrum zdarzeń.

Klucze w zwracanym słowniku obejmują:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Pobieranie identyfikatorów partycji centrum zdarzeń.

get_partition_properties

Pobierz właściwości określonej partycji.

Klucze w słowniku właściwości obejmują:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (wartość logiczna)

send_batch

Wysyła partię danych zdarzeń. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli obiekt EventHubProducerClient jest skonfigurowany do uruchamiania w trybie buforowanym, metoda spróbuje w kolejce zdarzeń do buforu w danym czasie, jeśli zostanie określony i zwrócony. Producent wykona automatyczne wysyłanie w tle w trybie buforowym.

Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:

  • Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,

    następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, które następnie zostaną wywołane.

  • Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,

    następnie domyślnie zostanie zgłoszony błąd.

Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:

  • Jeśli zdarzenia nie będą w kolejce w danym przekroczeniu limitu czasu, zostanie zgłoszony błąd bezpośrednio.

  • Jeśli zdarzenia nie będą wysyłane po pomyślnym w kolejce, wywołanie zwrotne on_error zostanie wywołane.

W trybie buforowym wysyłanie partii pozostanie nienaruszone i wysłane jako pojedyncza jednostka. Partia nie zostanie ponownie rozmieszona. Może to spowodować nieefektywność wysyłania zdarzeń.

Jeśli wysyłasz skończone listy eventData lub AmqpAnnotatedMessage i wiesz, że znajduje się ona w limicie rozmiaru ramki centrum zdarzeń, możesz wysłać je za pomocą wywołania send_batch . W przeciwnym razie użyj polecenia create_batch , aby utworzyć zdarzenie EventDataBatch i dodać element EventData lub AmqpAnnotatedMessage do partii jeden po drugim do limitu rozmiaru, a następnie wywołać tę metodę, aby wysłać partię.

send_event

Wysyła dane zdarzenia. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli obiekt EventHubProducerClient jest skonfigurowany do uruchamiania w trybie buforowanym, metoda spróbuje w kolejce zdarzeń do buforu w danym czasie, jeśli zostanie określony i zwrócony. Producent wykona automatyczne wysyłanie w tle w trybie buforowym.

Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób: * Jeśli wywołanie zwrotne on_error zostanie przekazane podczas tworzenia wystąpienia klienta producenta,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób: * Jeśli zdarzenia nie będą w kolejce w danym przekroczeniu limitu czasu, zostanie zgłoszony błąd bezpośrednio.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Zamknij klienta producenta bazowego połączenia AMQP i linków.

close(*, flush: bool = True, **kwargs: Any) -> None

Parametry

flush
bool

Tylko tryb buforowany. W przypadku ustawienia wartości True zdarzenia w buforze będą wysyłane natychmiast. Wartość domyślna to True.

timeout
float lub None

Tylko tryb buforowany. Limit czasu zamknięcia producenta. Wartość domyślna to Brak, co oznacza brak limitu czasu.

Typ zwracany

Wyjątki

Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.

Przykłady

Zamknij klienta.


   import os
   from azure.eventhub import EventHubProducerClient, EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = producer.create_batch()

       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # EventDataBatch object reaches max_size.
               # New EventDataBatch object can be created here to send more data
               break

       producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       producer.close()

create_batch

Utwórz obiekt EventDataBatch o maksymalnym rozmiarze całej zawartości ograniczanej przez max_size_in_bytes.

Max_size_in_bytes nie powinna być większa niż maksymalny dozwolony rozmiar komunikatu zdefiniowany przez usługę.

create_batch(**kwargs: Any) -> EventDataBatch

Typ zwracany

Wyjątki

Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.

Przykłady

Tworzenie obiektu EventDataBatch w ograniczonym rozmiarze


       event_data_batch = producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Tylko tryb buforowany. Opróżnij zdarzenia w buforze, które mają być wysyłane natychmiast, jeśli klient działa w trybie buforowym.

flush(**kwargs: Any) -> None

Parametry

timeout
Optional[float]

Limit czasu opróżniania buforowanych zdarzeń, wartość domyślna to Brak, co oznacza brak limitu czasu.

Typ zwracany

Wyjątki

Jeśli producent nie opróżni buforu w danym przedziale czasu w trybie buforowym.

from_connection_string

Utwórz element EventHubProducerClient na podstawie parametry połączenia.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: Literal[False] = False, **kwargs: Any) -> EventHubProducerClient

Parametry

conn_str
str
Wymagane

Parametry połączenia centrum zdarzeń.

eventhub_name
str

Ścieżka określonego centrum zdarzeń w celu połączenia klienta z.

buffered_mode
bool

Jeśli wartość True, klient producenta będzie zbierać zdarzenia w buforze, wydajnie wsadowo, a następnie publikować. Wartość domyślna to False.

buffer_concurrency
<xref:ThreadPoolExecutor> lub int lub None

Element ThreadPoolExecutor, który ma być używany do publikowania zdarzeń lub liczby procesów roboczych dla klasy ThreadPoolExecutor. Wartość domyślna to Brak, a element ThreadPoolExecutor z domyślną liczbą procesów roboczych zostanie utworzony na https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

Wywołanie zwrotne, które ma być wywoływane po pomyślnym opublikowaniu partii. Wywołanie zwrotne przyjmuje dwa parametry:

  • zdarzenia: lista zdarzeń, które zostały pomyślnie opublikowane

  • partition_id: identyfikator partycji, do którego opublikowano zdarzenia na liście.

Funkcja wywołania zwrotnego powinna być zdefiniowana na przykład: on_success(events, partition_id). Wymagana, jeśli buffered_mode ma wartość True, a opcjonalne, jeśli buffered_mode ma wartość False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

Wywołanie zwrotne, które ma być wywoływane po opublikowaniu partii. Wymagane, gdy w buffered_mode ma wartość True, a opcjonalne, jeśli buffered_mode ma wartość False. Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_error(zdarzenia, partition_id, błąd), gdzie:

  • zdarzenia: lista zdarzeń, których nie można opublikować,

  • partition_id: identyfikator partycji, do którego próbowano opublikować zdarzenia na liście i

  • błąd: wyjątek związany z niepowodzeniem wysyłania.

Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:

  • Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,

    następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, który następnie zostanie wywołany.

  • Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,

    następnie błąd zostanie zgłoszony domyślnie.

Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:

  • Jeśli nie można umieścić zdarzeń w kolejce w ramach danego limitu czasu, zostanie zgłoszony błąd bezpośrednio.

  • Jeśli zdarzenia nie będą wysyłane po pomyślnym zapisie w kolejce, wywołanie zwrotne on_error zostanie wywołane.

max_buffer_length
int

Tylko tryb buforowany. Łączna liczba zdarzeń na partycję, które mogą być buforowane przed wyzwoleniem opróżnienia. Wartość domyślna to 1500 w trybie buforowym.

max_wait_time
Optional[float]

Tylko tryb buforowany. Czas oczekiwania na utworzenie partii ze zdarzeniami w buforze przed opublikowaniem. Wartość domyślna to 1 w trybie buforowym.

logging_enable
bool

Określa, czy dane wyjściowe dzienników śledzenia sieci do rejestratora. Wartość domyślna to False.

http_proxy
Dict

Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość ciągu) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".

auth_timeout
float

Czas w sekundach oczekiwania na autoryzowanie tokenu przez usługę. Wartość domyślna to 60 sekund. W przypadku ustawienia wartości 0 nie zostanie wymuszony limit czasu od klienta.

user_agent
str

Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.

retry_total
int

Całkowita liczba prób ponownego wykonania operacji zakończonych niepowodzeniem w przypadku wystąpienia błędu. Wartość domyślna to 3.

retry_backoff_factor
float

Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnień). W trybie stałym zasady ponawiania zawsze będą w stanie uśpienia dla składnika {backoff}. W trybie wykładniczym zasady ponawiania zostaną uśpine dla: {współczynnik wycofywania} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie zostanie spane dla wartości [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.

retry_backoff_max
float

Maksymalny czas wycofywania. Wartość domyślna to 120 sekund (2 minuty).

retry_mode
str

Zachowanie opóźnienia między ponownymi próbami. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".

idle_timeout
float

Limit czasu (w sekundach), po którym klient zamknie połączenie bazowe, jeśli nie ma żadnej aktywności. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.

transport_type
TransportType

Typ protokołu transportowego, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.

http_proxy

Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość ciągu) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".

custom_endpoint_address
Optional[str]

Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiający kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie wyglądać następująco: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie będzie używany port 443.

connection_verify
Optional[str]

Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to Brak, w którym przypadku zostanie użyty element certifi.where().

uamqp_transport
bool

Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.

Typ zwracany

Wyjątki

Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.

Przykłady

Utwórz nowe wystąpienie klasy EventHubProducerClient z parametry połączenia.


   import os
   from azure.eventhub import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

Liczba zdarzeń, które są buforowane i oczekujące na opublikowanie dla danej partycji. Zwraca wartość None w trybie bez buforowania. UWAGA: Bufor zdarzeń jest przetwarzany w wątku w tle, dlatego liczba zdarzeń w buforze zgłoszonym przez ten interfejs API powinna być uznawana tylko za przybliżenie i jest zalecana tylko do użycia w debugowaniu. W przypadku identyfikatora partycji, który nie ma zdarzeń buforowanych, 0 zostanie zwrócone niezależnie od tego, czy ten identyfikator partycji rzeczywiście istnieje w centrum zdarzeń.

get_buffered_event_count(partition_id: str) -> int | None

Parametry

partition_id
str
Wymagane

Identyfikator partycji docelowej.

Typ zwracany

int,

Wyjątki

Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.

get_eventhub_properties

Pobieranie właściwości centrum zdarzeń.

Klucze w zwracanym słowniku obejmują:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

Zwraca

Słownik zawierający właściwości usługi EventHub.

Typ zwracany

Wyjątki

get_partition_ids

Pobieranie identyfikatorów partycji centrum zdarzeń.

get_partition_ids() -> List[str]

Zwraca

Lista identyfikatorów partycji.

Typ zwracany

Wyjątki

get_partition_properties

Pobierz właściwości określonej partycji.

Klucze w słowniku właściwości obejmują:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (wartość logiczna)

get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametry

partition_id
str
Wymagane

Identyfikator partycji docelowej.

Zwraca

Słownik właściwości partycji.

Typ zwracany

Wyjątki

send_batch

Wysyła partię danych zdarzeń. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli obiekt EventHubProducerClient jest skonfigurowany do uruchamiania w trybie buforowanym, metoda spróbuje w kolejce zdarzeń do buforu w danym czasie, jeśli zostanie określony i zwrócony. Producent wykona automatyczne wysyłanie w tle w trybie buforowym.

Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:

  • Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,

    następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, które następnie zostaną wywołane.

  • Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,

    następnie domyślnie zostanie zgłoszony błąd.

Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:

  • Jeśli zdarzenia nie będą w kolejce w danym przekroczeniu limitu czasu, zostanie zgłoszony błąd bezpośrednio.

  • Jeśli zdarzenia nie będą wysyłane po pomyślnym w kolejce, wywołanie zwrotne on_error zostanie wywołane.

W trybie buforowym wysyłanie partii pozostanie nienaruszone i wysłane jako pojedyncza jednostka. Partia nie zostanie ponownie rozmieszona. Może to spowodować nieefektywność wysyłania zdarzeń.

Jeśli wysyłasz skończone listy eventData lub AmqpAnnotatedMessage i wiesz, że znajduje się ona w limicie rozmiaru ramki centrum zdarzeń, możesz wysłać je za pomocą wywołania send_batch . W przeciwnym razie użyj polecenia create_batch , aby utworzyć zdarzenie EventDataBatch i dodać element EventData lub AmqpAnnotatedMessage do partii jeden po drugim do limitu rozmiaru, a następnie wywołać tę metodę, aby wysłać partię.

send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Parametry

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Wymagane

Obiekt EventDataBatch do wysłania lub lista zdarzeń do wysłania w partii. Wszystkie zdarzenia EventData lub AmqpAnnotatedMessage na liście lub EventDataBatch trafią na tę samą partycję.

timeout
float

Maksymalny czas oczekiwania na wysłanie danych zdarzenia w trybie bez buforowania lub maksymalny czas oczekiwania w kolejce danych zdarzenia do buforu w trybie buforowania. W trybie bez buforowania zostanie użyty domyślny czas oczekiwania określony podczas tworzenia producenta. W trybie buforowym domyślny czas oczekiwania to Brak.

partition_id
str

Określony identyfikator partycji do wysłania. Wartość domyślna to Brak, w którym przypadku usługa zostanie przypisana do wszystkich partycji przy użyciu działania okrężnego. Błąd TypeError zostanie zgłoszony, jeśli zostanie określony partition_id, a event_data_batch jest zdarzeniem EventDataBatch, ponieważ samo zdarzenie EventDataBatch ma partition_id.

partition_key
str

W przypadku danego partition_key dane zdarzeń zostaną wysłane do określonej partycji centrum zdarzeń określonego przez usługę. Błąd TypeError zostanie zgłoszony, jeśli zostanie określony partition_key, a event_data_batch jest zdarzeniem EventDataBatch, ponieważ sama funkcja EventDataBatch ma partition_key. Jeśli zostaną podane zarówno partition_id, jak i partition_key, partition_id będą miały pierwszeństwo. OSTRZEŻENIE: Ustawienie partition_key wartości innej niż ciąg dla zdarzeń, które mają być wysyłane, jest odradzane, ponieważ partition_key zostaną zignorowane przez usługę Centrum zdarzeń, a zdarzenia zostaną przypisane do wszystkich partycji przy użyciu działania okrężnego. Ponadto istnieją zestawy SDK do używania zdarzeń, które oczekują, że partition_key mają być tylko typem ciągu, mogą nie przeanalizować wartości innej niż ciąg.

Typ zwracany

Wyjątki

Jeśli wartość określona przez parametr limitu czasu upłynie przed wysłaniem zdarzenia w trybie bez buforowania lub zdarzenia nie mogą być umieszczone w kolejce do buforowanego w trybie buforowania.

Przykłady

Wysyła dane zdarzenia


       with producer:
           event_data_batch = producer.create_batch()

           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # EventDataBatch object reaches max_size.
                   # New EventDataBatch object can be created here to send more data
                   break

           producer.send_batch(event_data_batch)

send_event

Wysyła dane zdarzenia. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli obiekt EventHubProducerClient jest skonfigurowany do uruchamiania w trybie buforowanym, metoda spróbuje w kolejce zdarzeń do buforu w danym czasie, jeśli zostanie określony i zwrócony. Producent wykona automatyczne wysyłanie w tle w trybie buforowym.

Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób: * Jeśli wywołanie zwrotne on_error zostanie przekazane podczas tworzenia wystąpienia klienta producenta,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób: * Jeśli zdarzenia nie będą w kolejce w danym przekroczeniu limitu czasu, zostanie zgłoszony błąd bezpośrednio.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Parametry

event_data
Union[EventData, AmqpAnnotatedMessage]
Wymagane

Obiekt EventData , który ma zostać wysłany.

timeout
float

Maksymalny czas oczekiwania na wysłanie danych zdarzenia w trybie niebuforowany lub maksymalny czas oczekiwania w kolejce danych zdarzenia do buforu w trybie buforowym. W trybie bez buforowania domyślny czas oczekiwania określony podczas tworzenia producenta zostanie użyty. W trybie buforowym domyślny czas oczekiwania to Brak.

partition_id
str

Określony identyfikator partycji do wysłania. Wartość domyślna to Brak, w którym przypadku usługa zostanie przypisana do wszystkich partycji przy użyciu działania okrężnego. Błąd TypeError zostanie zgłoszony, jeśli zostanie określony partition_id, a event_data_batch jest zdarzeniem EventDataBatch, ponieważ samo zdarzenie EventDataBatch ma partition_id.

partition_key
str

W przypadku danego partition_key dane zdarzeń zostaną wysłane do określonej partycji centrum zdarzeń określonego przez usługę. Błąd TypeError zostanie zgłoszony, jeśli zostanie określony partition_key, a event_data_batch jest zdarzeniem EventDataBatch, ponieważ sama funkcja EventDataBatch ma partition_key. Jeśli zostaną podane zarówno partition_id, jak i partition_key, partition_id będą miały pierwszeństwo. OSTRZEŻENIE: Ustawienie partition_key wartości innej niż ciąg dla zdarzeń, które mają być wysyłane, jest odradzane, ponieważ partition_key zostaną zignorowane przez usługę Centrum zdarzeń, a zdarzenia zostaną przypisane do wszystkich partycji przy użyciu działania okrężnego. Ponadto istnieją zestawy SDK do używania zdarzeń, które oczekują, że partition_key mają być tylko typem ciągu, mogą nie przeanalizować wartości innej niż ciąg.

Typ zwracany

Wyjątki

Jeśli wartość określona przez parametr limitu czasu upłynął przed wysłaniem zdarzenia w trybie niebuforowym lub zdarzenia mogą zostać umieszczone w kolejce do buforowanego w trybie buforowym.

Atrybuty

total_buffered_event_count

Łączna liczba zdarzeń, które są obecnie buforowane i oczekujące na opublikowanie, we wszystkich partycjach. Zwraca wartość None w trybie bez buforowania. UWAGA: bufor zdarzeń jest przetwarzany w wątku w tle, dlatego liczba zdarzeń w buforze zgłoszonym przez ten interfejs API powinna być uznawana tylko za przybliżenie i jest zalecana tylko do użycia w debugowaniu.

Typ zwracany

int,