Bagikan melalui


EventHubProducerClient Kelas

Kelas EventHubProducerClient mendefinisikan antarmuka tingkat tinggi untuk mengirim peristiwa ke layanan Azure Event Hubs.

Warisan
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubProducerClient

Konstruktor

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)

Parameter

fully_qualified_namespace
str
Diperlukan

Nama host yang sepenuhnya memenuhi syarat untuk namespace Layanan Pusat Aktivitas. Ini kemungkinan mirip dengan .servicebus.windows.net

eventhub_name
str
Diperlukan

Jalur Event Hub tertentu untuk menyambungkan klien.

credential
AsyncTokenCredential atau AzureSasCredential atau AzureNamedKeyCredential
Diperlukan

Objek kredensial yang digunakan untuk autentikasi yang mengimplementasikan antarmuka tertentu untuk mendapatkan token. Ini menerima EventHubSharedKeyCredentialobjek kredensial , atau yang dihasilkan oleh pustaka identitas azure dan objek yang mengimplementasikan metode *get_token(mandiri, cakupan).

buffered_mode
bool

Jika True, klien produser akan mengumpulkan peristiwa dalam buffer, secara efisien batch, lalu menerbitkan. Defaultnya adalah False.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Panggilan balik yang akan dipanggil setelah batch berhasil diterbitkan. Panggilan balik mengambil dua parameter:

  • events: Daftar peristiwa yang telah berhasil diterbitkan

  • partition_id: Id partisi tempat peristiwa dalam daftar telah diterbitkan.

Fungsi panggilan balik harus didefinisikan seperti: on_success(peristiwa, partition_id). Diperlukan saat buffered_mode True sementara opsional jika buffered_mode False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Panggilan balik yang akan dipanggil setelah batch gagal diterbitkan. Diperlukan saat dalam buffered_mode True sementara opsional jika buffered_mode False. Fungsi panggilan balik harus didefinisikan seperti: on_error(peristiwa, partition_id, kesalahan), di mana:

  • peristiwa: Daftar peristiwa yang gagal diterbitkan,

  • partition_id: Id partisi tempat peristiwa dalam daftar telah dicoba diterbitkan ke dan

  • error: Pengecualian yang terkait dengan kegagalan pengiriman.

Jika buffered_mode False, panggilan balik on_error bersifat opsional dan kesalahan akan ditangani sebagai berikut:

  • Jika panggilan balik on_error diteruskan selama instansiasi klien produsen,

    kemudian informasi kesalahan akan diteruskan ke panggilan balik on_error , yang kemudian akan dipanggil.

  • Jika panggilan balik on_error tidak diteruskan selama instansiasi klien,

    maka kesalahan akan dimunculkan secara default.

Jika buffered_mode True, panggilan balik on_error diperlukan dan kesalahan akan ditangani sebagai berikut:

  • Jika peristiwa gagal diantrekan dalam batas waktu yang diberikan, maka kesalahan akan langsung dimunculkan.

  • Jika peristiwa gagal dikirim setelah berhasil mengantre, panggilan balik on_error akan dipanggil.

max_buffer_length
int

Mode buffered saja. Jumlah total peristiwa per partisi yang dapat di-buffer sebelum flush akan dipicu. Nilai defaultnya adalah 1500 dalam mode buffer.

max_wait_time
Optional[float]

Mode buffered saja. Jumlah waktu untuk menunggu batch dibangun dengan peristiwa di buffer sebelum diterbitkan. Nilai defaultnya adalah 1 dalam mode buffer.

logging_enable
bool

Apakah akan mengeluarkan log jejak jaringan ke pencatat. Defaultnya adalah False.

auth_timeout
float

Waktu dalam detik untuk menunggu token diotorisasi oleh layanan. Nilai default-nya adalah 60 detik. Jika diatur ke 0, tidak ada batas waktu yang akan diberlakukan dari klien.

user_agent
str

Jika ditentukan, ini akan ditambahkan di depan string agen pengguna.

retry_total
int

Jumlah total upaya untuk mengulangi operasi yang gagal ketika terjadi kesalahan. Nilai defaultnya adalah 3.

retry_backoff_factor
float

Faktor backoff untuk diterapkan di antara upaya setelah percobaan kedua (sebagian besar kesalahan segera diselesaikan dengan percobaan kedua tanpa penundaan). Dalam mode tetap, kebijakan coba lagi akan selalu tidur untuk {faktor backoff}. Dalam mode 'eksponensial', kebijakan percobaan kembali akan tidur untuk: {faktor backoff} * (2 ** ({jumlah total percobaan ulang} - 1)) detik. Jika backoff_factor adalah 0,1, maka coba lagi akan tidur selama [0.0s, 0.2s, 0.4s, ...] di antara percobaan ulang. Nilai defaultnya adalah 0,8.

retry_backoff_max
float

Waktu back off maksimum. Nilai defaultnya adalah 120 detik (2 menit).

retry_mode
str

Perilaku penundaan antara upaya coba lagi. Nilai yang didukung adalah 'tetap' atau 'eksponensial', di mana defaultnya adalah 'eksponensial'.

idle_timeout
float

Waktu habis, dalam hitungan detik, setelah itu klien ini akan menutup koneksi yang mendasar jika tidak ada aktivitas. Secara default nilainya adalah Tidak Ada, yang berarti bahwa klien tidak akan dimatikan karena tidak aktif kecuali dimulai oleh layanan.

transport_type
TransportType

Jenis protokol transportasi yang akan digunakan untuk berkomunikasi dengan layanan Azure Event Hubs. Defaultnya adalah TransportType.Amqp dalam hal ini port 5671 digunakan. Jika port 5671 tidak tersedia/diblokir di lingkungan jaringan, TransportType.AmqpOverWebsocket dapat digunakan sebagai gantinya yang menggunakan port 443 untuk komunikasi.

http_proxy
dict

Pengaturan proksi HTTP. Ini harus berupa kamus dengan kunci berikut: 'proxy_hostname' (nilai str) dan 'proxy_port' (nilai int). Selain itu, kunci berikut mungkin juga ada: 'nama pengguna', 'kata sandi'.

custom_endpoint_address
Optional[str]

Alamat titik akhir kustom yang digunakan untuk membuat koneksi ke layanan Azure Event Hubs, memungkinkan permintaan jaringan dirutekan melalui gateway aplikasi atau jalur lain yang diperlukan untuk lingkungan host. Defaultnya adalah Tidak Ada. Formatnya seperti "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jika port tidak ditentukan dalam custom_endpoint_address, secara default port 443 akan digunakan.

connection_verify
Optional[str]

Jalur ke file CA_BUNDLE kustom sertifikat SSL yang digunakan untuk mengautentikasi identitas titik akhir koneksi. Defaultnya adalah Tidak Ada dalam hal ini certifi.where() akan digunakan.

uamqp_transport
bool

Apakah akan menggunakan pustaka uamqp sebagai transportasi yang mendasar. Nilai defaultnya adalah False dan pustaka AMQP Python Murni akan digunakan sebagai transportasi yang mendasar.

socket_timeout
float

Waktu dalam detik bahwa soket yang mendasar pada koneksi harus menunggu saat mengirim dan menerima data sebelum waktu habis. Nilai defaultnya adalah 0,2 untuk TransportType.Amqp dan 1 untuk TransportType.AmqpOverWebsocket. Jika kesalahan EventHubsConnectionError terjadi karena waktu penulisan habis, nilai yang lebih besar dari default mungkin perlu diteruskan. Ini untuk skenario penggunaan tingkat lanjut dan biasanya nilai default harus cukup.

Contoh

Buat instans baru EventHubProducerClient.


   import os
   from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Metode

close

Tutup koneksi dan tautan AMQP yang mendasar klien Produser.

create_batch

Buat objek EventDataBatch dengan ukuran maksimum semua konten yang dibatasi oleh max_size_in_bytes.

max_size_in_bytes tidak boleh lebih besar dari ukuran pesan maksimum yang diizinkan yang ditentukan oleh layanan.

flush

Mode buffered saja. Siram peristiwa di buffer untuk segera dikirim jika klien bekerja dalam mode buffer.

from_connection_string

Buat EventHubProducerClient dari string koneksi.

get_buffered_event_count

Jumlah peristiwa yang di-buffer dan menunggu untuk diterbitkan untuk partisi tertentu. Mengembalikan Tidak Ada dalam mode non-buffer. CATATAN: Buffer peristiwa diproses dalam koroutin latar belakang, oleh karena itu jumlah peristiwa dalam buffer yang dilaporkan oleh API ini harus dianggap hanya perkiraan dan hanya direkomendasikan untuk digunakan dalam penelusuran kesalahan. Untuk ID partisi yang tidak memiliki peristiwa yang di-buffer, 0 akan dikembalikan terlepas dari apakah ID partisi tersebut benar-benar ada dalam Event Hub.

get_eventhub_properties

Mendapatkan properti Pusat Aktivitas.

Kunci dalam kamus yang dikembalikan meliputi:

  • eventhub_name (str)

  • created_at (datetime.datetime UTC)

  • partition_ids (list[str])

get_partition_ids

Mendapatkan ID partisi dari Pusat Aktivitas.

get_partition_properties

Mendapatkan properti dari partisi yang ditentukan.

Kunci dalam kamus properti meliputi:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (datetime.datetime UTC)

  • is_empty (bool)

send_batch

Mengirim batch data peristiwa. Secara default, metode akan memblokir hingga pengakuan diterima atau waktu operasi habis. Jika EventHubProducerClient dikonfigurasi untuk berjalan dalam mode buffer, metode akan mengantrekan peristiwa ke buffer lokal dan kembali. Produser akan melakukan pengiriman otomatis di latar belakang.

Jika buffered_mode False, panggilan balik on_error bersifat opsional dan kesalahan akan ditangani sebagai berikut:

  • Jika panggilan balik on_error diteruskan selama instansiasi klien produser,

    kemudian informasi kesalahan akan diteruskan ke panggilan balik on_error , yang kemudian akan dipanggil.

  • Jika panggilan balik on_error tidak diteruskan selama instansiasi klien,

    maka kesalahan akan dimunculkan secara default.

Jika buffered_mode True, panggilan balik on_error diperlukan dan kesalahan akan ditangani sebagai berikut:

  • Jika peristiwa gagal diantrekan dalam batas waktu yang diberikan, maka kesalahan akan langsung dimunculkan.

  • Jika peristiwa gagal dikirim setelah berhasil mengantre, panggilan balik on_error akan dipanggil.

Dalam mode buffer, mengirim batch akan tetap utuh dan dikirim sebagai satu unit. Batch tidak akan diatur ulang. Hal ini dapat mengakibatkan ketidakefisienan pengiriman peristiwa.

Jika Anda mengirim daftar Terbatas EventData atau AmqpAnnotatedMessage dan Anda tahu itu berada dalam batas ukuran bingkai hub peristiwa, Anda dapat mengirimnya dengan panggilan send_batch . Jika tidak, gunakan create_batch untuk membuat EventDataBatch dan tambahkan EventData atau AmqpAnnotatedMessage ke dalam batch satu per satu hingga batas ukuran, lalu panggil metode ini untuk mengirim batch.

send_event

Mengirim data peristiwa. Secara default, metode akan memblokir hingga pengakuan diterima atau waktu operasi habis. Jika EventHubProducerClient dikonfigurasi untuk berjalan dalam mode buffer, metode akan mengantrekan peristiwa ke buffer lokal dan kembali. Produser akan melakukan batching otomatis dan pengiriman di latar belakang.

Jika buffered_mode False, panggilan balik on_error bersifat opsional dan kesalahan akan ditangani sebagai berikut: * Jika panggilan balik on_error diteruskan selama instantiasi klien produser,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Jika buffered_mode True, panggilan balik on_error diperlukan dan kesalahan akan ditangani sebagai berikut: * Jika peristiwa gagal mengantre dalam batas waktu yang diberikan, maka kesalahan akan langsung dimunculkan.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Tutup koneksi dan tautan AMQP yang mendasar klien Produser.

async close(*, flush: bool = True, **kwargs: Any) -> None

Parameter

flush
bool

Mode buffered saja. Jika diatur ke True, peristiwa di buffer akan segera dikirim. Defaultnya adalah True.

timeout
float atau None

Mode buffered saja. Waktu habis untuk menutup produser. Defaultnya adalah Tidak Ada yang berarti tidak ada batas waktu.

Tipe hasil

Pengecualian

Jika terjadi kesalahan saat membersihkan buffer jika flush diatur ke True atau menutup koneksi AMQP yang mendasar dalam mode buffer.

Contoh

Tutup handler.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   from azure.eventhub import EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break
       await producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       await producer.close()

create_batch

Buat objek EventDataBatch dengan ukuran maksimum semua konten yang dibatasi oleh max_size_in_bytes.

max_size_in_bytes tidak boleh lebih besar dari ukuran pesan maksimum yang diizinkan yang ditentukan oleh layanan.

async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch

Tipe hasil

Pengecualian

Jika terjadi kesalahan saat membersihkan buffer jika flush diatur ke True atau menutup koneksi AMQP yang mendasar dalam mode buffer.

Contoh

Membuat objek EventDataBatch dalam ukuran terbatas


       from azure.eventhub import EventData
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Mode buffered saja. Siram peristiwa di buffer untuk segera dikirim jika klien bekerja dalam mode buffer.

async flush(**kwargs: Any) -> None

Parameter

timeout
float atau None

Waktu habis untuk membersihkan peristiwa yang di-buffer, defaultnya adalah Tidak Ada yang berarti tidak ada waktu habis.

Tipe hasil

Pengecualian

Jika produsen gagal membersihkan buffer dalam batas waktu yang diberikan dalam mode buffer.

from_connection_string

Buat EventHubProducerClient dari string koneksi.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient

Parameter

conn_str
str
Diperlukan

String koneksi Pusat Aktivitas.

eventhub_name
str

Jalur Pusat Aktivitas tertentu untuk menyambungkan klien.

buffered_mode
bool

Jika True, klien produsen akan mengumpulkan peristiwa dalam buffer, secara efisien batch, lalu menerbitkan. Defaultnya adalah False.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Panggilan balik yang akan dipanggil setelah batch berhasil diterbitkan. Panggilan balik mengambil dua parameter:

  • events: Daftar peristiwa yang telah berhasil diterbitkan

  • partition_id: Id partisi tempat peristiwa dalam daftar telah diterbitkan.

Fungsi panggilan balik harus didefinisikan seperti: on_success(peristiwa, partition_id). Ini diperlukan ketika buffered_mode True sementara opsional jika buffered_mode False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Panggilan balik yang akan dipanggil setelah batch gagal diterbitkan. Fungsi panggilan balik harus didefinisikan seperti: on_error(peristiwa, partition_id, kesalahan), di mana:

  • peristiwa: Daftar peristiwa yang gagal dipublikasikan,

  • partition_id: Id partisi tempat peristiwa dalam daftar telah dicoba diterbitkan ke dan

  • error: Pengecualian yang terkait dengan kegagalan pengiriman.

Jika buffered_mode False, panggilan balik on_error bersifat opsional dan kesalahan akan ditangani sebagai berikut:

  • Jika panggilan balik on_error diteruskan selama instansiasi klien produser,

    kemudian informasi kesalahan akan diteruskan ke panggilan balik on_error , yang kemudian akan dipanggil.

  • Jika panggilan balik on_error tidak diteruskan selama instansiasi klien,

    maka kesalahan akan dimunculkan secara default.

Jika buffered_mode True, panggilan balik on_error diperlukan dan kesalahan akan ditangani sebagai berikut:

  • Jika peristiwa gagal diantrekan dalam batas waktu yang diberikan, maka kesalahan akan langsung dimunculkan.

  • Jika peristiwa gagal dikirim setelah berhasil mengantre, panggilan balik on_error akan dipanggil.

max_buffer_length
int

Mode buffer saja. Jumlah total peristiwa per partisi yang dapat di-buffer sebelum flush akan dipicu. Nilai defaultnya adalah 1500 dalam mode buffer.

max_wait_time
Optional[float]

Mode buffer saja. Jumlah waktu untuk menunggu batch dibangun dengan peristiwa di buffer sebelum diterbitkan. Nilai defaultnya adalah 1 dalam mode buffer.

logging_enable
bool

Apakah akan mengeluarkan log jejak jaringan ke pencatat. Defaultnya adalah False.

http_proxy
dict

Pengaturan proksi HTTP. Ini harus berupa kamus dengan kunci berikut: 'proxy_hostname' (nilai str) dan 'proxy_port' (nilai int). Selain itu, kunci berikut mungkin juga ada: 'nama pengguna', 'kata sandi'.

auth_timeout
float

Waktu dalam detik untuk menunggu token diotorisasi oleh layanan. Nilai default-nya adalah 60 detik. Jika diatur ke 0, tidak ada batas waktu yang akan diberlakukan dari klien.

user_agent
str

Jika ditentukan, ini akan ditambahkan di depan string agen pengguna.

retry_total
int

Jumlah total upaya untuk mengulangi operasi yang gagal ketika terjadi kesalahan. Nilai defaultnya adalah 3.

retry_backoff_factor
float

Faktor backoff untuk diterapkan di antara upaya setelah percobaan kedua (sebagian besar kesalahan segera diselesaikan dengan percobaan kedua tanpa penundaan). Dalam mode tetap, kebijakan coba lagi akan selalu tidur untuk {faktor backoff}. Dalam mode 'eksponensial', kebijakan percobaan kembali akan tidur untuk: {faktor backoff} * (2 ** ({jumlah total percobaan ulang} - 1)) detik. Jika backoff_factor adalah 0,1, maka percobaan ulang akan tidur selama [0,0 detik, 0,2 detik, 0,4 detik, ...] di antara percobaan ulang. Nilai defaultnya adalah 0,8.

retry_backoff_max
float

Waktu back off maksimum. Nilai defaultnya adalah 120 detik (2 menit).

retry_mode
str

Perilaku penundaan antara upaya coba lagi. Nilai yang didukung adalah 'tetap' atau 'eksponensial', di mana defaultnya adalah 'eksponensial'.

idle_timeout
float

Waktu habis, dalam hitungan detik, setelah itu klien ini akan menutup koneksi yang mendasar jika tidak ada aktivitas. Secara default nilainya adalah Tidak Ada, yang berarti bahwa klien tidak akan dimatikan karena tidak aktif kecuali dimulai oleh layanan.

transport_type
TransportType

Jenis protokol transportasi yang akan digunakan untuk berkomunikasi dengan layanan Azure Event Hubs. Defaultnya adalah TransportType.Amqp dalam hal ini port 5671 digunakan. Jika port 5671 tidak tersedia/diblokir di lingkungan jaringan, TransportType.AmqpOverWebsocket dapat digunakan sebagai gantinya yang menggunakan port 443 untuk komunikasi.

custom_endpoint_address
Optional[str]

Alamat titik akhir kustom yang digunakan untuk membuat koneksi ke layanan Azure Event Hubs, memungkinkan permintaan jaringan dirutekan melalui gateway aplikasi atau jalur lain yang diperlukan untuk lingkungan host. Defaultnya adalah Tidak Ada. Formatnya akan seperti "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jika port tidak ditentukan dalam custom_endpoint_address, secara default port 443 akan digunakan.

connection_verify
Optional[str]

Jalur ke file CA_BUNDLE kustom sertifikat SSL yang digunakan untuk mengautentikasi identitas titik akhir koneksi. Defaultnya adalah Tidak Ada dalam hal ini certifi.where() akan digunakan.

uamqp_transport
bool

Apakah akan menggunakan pustaka uamqp sebagai transportasi yang mendasar. Nilai defaultnya adalah False dan pustaka AMQP Python Murni akan digunakan sebagai transportasi yang mendasar.

Tipe hasil

Pengecualian

Jika terjadi kesalahan saat membersihkan buffer jika flush diatur ke True atau menutup koneksi AMQP yang mendasar dalam mode buffer.

Contoh

Buat instans baru EventHubProducerClient dari string koneksi.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

Jumlah peristiwa yang di-buffer dan menunggu untuk diterbitkan untuk partisi tertentu. Mengembalikan Tidak Ada dalam mode non-buffer. CATATAN: Buffer peristiwa diproses dalam koroutin latar belakang, oleh karena itu jumlah peristiwa dalam buffer yang dilaporkan oleh API ini harus dianggap hanya perkiraan dan hanya direkomendasikan untuk digunakan dalam penelusuran kesalahan. Untuk ID partisi yang tidak memiliki peristiwa yang di-buffer, 0 akan dikembalikan terlepas dari apakah ID partisi tersebut benar-benar ada dalam Event Hub.

get_buffered_event_count(partition_id: str) -> int | None

Parameter

partition_id
str
Diperlukan

ID partisi target.

Tipe hasil

int,

Pengecualian

Jika terjadi kesalahan saat membersihkan buffer jika flush diatur ke True atau menutup koneksi AMQP yang mendasar dalam mode buffer.

get_eventhub_properties

Mendapatkan properti Pusat Aktivitas.

Kunci dalam kamus yang dikembalikan meliputi:

  • eventhub_name (str)

  • created_at (datetime.datetime UTC)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Mengembalikan

Kamus yang berisi informasi tentang Pusat Aktivitas.

Tipe hasil

Pengecualian

get_partition_ids

Mendapatkan ID partisi dari Pusat Aktivitas.

async get_partition_ids() -> List[str]

Mengembalikan

Daftar ID partisi.

Tipe hasil

Pengecualian

get_partition_properties

Mendapatkan properti dari partisi yang ditentukan.

Kunci dalam kamus properti meliputi:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (datetime.datetime UTC)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parameter

partition_id
str
Diperlukan

ID partisi target.

Mengembalikan

Dict properti partisi.

Tipe hasil

Pengecualian

send_batch

Mengirim batch data peristiwa. Secara default, metode akan memblokir hingga pengakuan diterima atau waktu operasi habis. Jika EventHubProducerClient dikonfigurasi untuk berjalan dalam mode buffer, metode akan mengantrekan peristiwa ke buffer lokal dan kembali. Produser akan melakukan pengiriman otomatis di latar belakang.

Jika buffered_mode False, panggilan balik on_error bersifat opsional dan kesalahan akan ditangani sebagai berikut:

  • Jika panggilan balik on_error diteruskan selama instansiasi klien produser,

    kemudian informasi kesalahan akan diteruskan ke panggilan balik on_error , yang kemudian akan dipanggil.

  • Jika panggilan balik on_error tidak diteruskan selama instansiasi klien,

    maka kesalahan akan dimunculkan secara default.

Jika buffered_mode True, panggilan balik on_error diperlukan dan kesalahan akan ditangani sebagai berikut:

  • Jika peristiwa gagal diantrekan dalam batas waktu yang diberikan, maka kesalahan akan langsung dimunculkan.

  • Jika peristiwa gagal dikirim setelah berhasil mengantre, panggilan balik on_error akan dipanggil.

Dalam mode buffer, mengirim batch akan tetap utuh dan dikirim sebagai satu unit. Batch tidak akan diatur ulang. Hal ini dapat mengakibatkan ketidakefisienan pengiriman peristiwa.

Jika Anda mengirim daftar Terbatas EventData atau AmqpAnnotatedMessage dan Anda tahu itu berada dalam batas ukuran bingkai hub peristiwa, Anda dapat mengirimnya dengan panggilan send_batch . Jika tidak, gunakan create_batch untuk membuat EventDataBatch dan tambahkan EventData atau AmqpAnnotatedMessage ke dalam batch satu per satu hingga batas ukuran, lalu panggil metode ini untuk mengirim batch.

async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Parameter

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Diperlukan

Objek EventDataBatch yang akan dikirim atau daftar EventData yang akan dikirim dalam batch. Semua EventData atau AmqpAnnotatedMessage dalam daftar atau EventDataBatch akan mendarat di partisi yang sama.

timeout
float

Waktu tunggu maksimum untuk mengirim data peristiwa dalam mode non-buffer atau waktu tunggu maksimum untuk mengantrekan data peristiwa ke dalam buffer dalam mode buffer. Dalam mode non-buffer, waktu tunggu default yang ditentukan ketika produsen dibuat akan digunakan. Dalam mode buffer, waktu tunggu default adalah Tidak Ada.

partition_id
str

ID partisi tertentu untuk dikirim. Defaultnya adalah Tidak Ada, dalam hal ini layanan akan menetapkan ke semua partisi menggunakan round-robin. TypeError akan dinaikkan jika partition_id ditentukan dan event_data_batch adalah EventDataBatch karena EventDataBatch itu sendiri memiliki partition_id.

partition_key
str

Dengan partition_key yang diberikan, data peristiwa akan dikirim ke partisi tertentu dari Pusat Aktivitas yang diputuskan oleh layanan. TypeError akan dinaikkan jika partition_key ditentukan dan event_data_batch adalah EventDataBatch karena EventDataBatch itu sendiri memiliki partition_key. Jika partition_id dan partition_key disediakan, partition_id akan diutamakan. PERINGATAN: Mengatur partition_key nilai non-string pada peristiwa yang akan dikirim tidak disarankan karena partition_key akan diabaikan oleh layanan Event Hub dan peristiwa akan ditetapkan ke semua partisi menggunakan round-robin. Selain itu, ada SDK untuk mengonsumsi peristiwa yang mengharapkan partition_key hanya menjadi jenis string, mereka mungkin gagal mengurai nilai non-string.

Tipe hasil

Pengecualian

Jika nilai yang ditentukan oleh parameter batas waktu berlalu sebelum peristiwa dapat dikirim dalam mode non-buffer atau peristiwa dapat diantrekan ke dalam buffer dalam mode buffer.

Contoh

Mengirim data peristiwa secara asinkron


       async with producer:
           event_data_batch = await producer.create_batch()
           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # The EventDataBatch object reaches its max_size.
                   # You can send the full EventDataBatch object and create a new one here.
                   break
           await producer.send_batch(event_data_batch)

send_event

Mengirim data peristiwa. Secara default, metode akan memblokir hingga pengakuan diterima atau waktu operasi habis. Jika EventHubProducerClient dikonfigurasi untuk berjalan dalam mode buffer, metode akan mengantrekan peristiwa ke buffer lokal dan kembali. Produser akan melakukan batching otomatis dan pengiriman di latar belakang.

Jika buffered_mode False, panggilan balik on_error bersifat opsional dan kesalahan akan ditangani sebagai berikut: * Jika panggilan balik on_error diteruskan selama instantiasi klien produser,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Jika buffered_mode True, panggilan balik on_error diperlukan dan kesalahan akan ditangani sebagai berikut: * Jika peristiwa gagal mengantre dalam batas waktu yang diberikan, maka kesalahan akan langsung dimunculkan.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Parameter

event_data
Union[EventData, AmqpAnnotatedMessage]
Diperlukan

Objek EventData yang akan dikirim.

timeout
float

Waktu tunggu maksimum untuk mengirim data peristiwa dalam mode non-buffer atau waktu tunggu maksimum untuk mengantrekan data peristiwa ke dalam buffer dalam mode buffer. Dalam mode non-buffer, waktu tunggu default yang ditentukan ketika produsen dibuat akan digunakan. Dalam mode buffer, waktu tunggu default adalah Tidak Ada.

partition_id
str

ID partisi tertentu untuk dikirim. Defaultnya adalah Tidak Ada, dalam hal ini layanan akan menetapkan ke semua partisi menggunakan round-robin. TypeError akan dinaikkan jika partition_id ditentukan dan event_data_batch adalah EventDataBatch karena EventDataBatch itu sendiri memiliki partition_id.

partition_key
str

Dengan partition_key yang diberikan, data peristiwa akan dikirim ke partisi tertentu dari Pusat Aktivitas yang diputuskan oleh layanan. TypeError akan dinaikkan jika partition_key ditentukan dan event_data_batch adalah EventDataBatch karena EventDataBatch itu sendiri memiliki partition_key. Jika partition_id dan partition_key disediakan, partition_id akan diutamakan. PERINGATAN: Mengatur partition_key nilai non-string pada peristiwa yang akan dikirim tidak disarankan karena partition_key akan diabaikan oleh layanan Event Hub dan peristiwa akan ditetapkan ke semua partisi menggunakan round-robin. Selain itu, ada SDK untuk mengonsumsi peristiwa yang mengharapkan partition_key hanya menjadi jenis string, mereka mungkin gagal mengurai nilai non-string.

Tipe hasil

Pengecualian

Jika nilai yang ditentukan oleh parameter batas waktu berlalu sebelum peristiwa dapat dikirim dalam mode non-buffer atau peristiwa tidak dapat diantrekan ke dalam buffer dalam mode buffer.

Atribut

total_buffered_event_count

Jumlah total peristiwa yang saat ini di-buffer dan menunggu untuk diterbitkan, di semua partisi. Mengembalikan Tidak Ada dalam mode non-buffer. CATATAN: Buffer peristiwa diproses dalam koroutin latar belakang, oleh karena itu jumlah peristiwa dalam buffer yang dilaporkan oleh API ini harus dianggap hanya perkiraan dan hanya direkomendasikan untuk digunakan dalam penelusuran kesalahan.

Tipe hasil

int,