EventHubConsumerClient Kelas

Kelas EventHubConsumerClient mendefinisikan antarmuka tingkat tinggi untuk menerima peristiwa dari layanan Azure Event Hubs.

Tujuan utama EventHubConsumerClient adalah untuk menerima peristiwa dari semua partisi EventHub dengan penyeimbangan beban dan titik pemeriksaan.

Ketika beberapa instans EventHubConsumerClient berjalan terhadap hub peristiwa yang sama, grup konsumen, dan lokasi titik pemeriksaan, partisi akan didistribusikan secara merata di antara mereka.

Untuk mengaktifkan penyeimbangan beban dan titik pemeriksaan yang bertahan, checkpoint_store harus diatur saat membuat EventHubConsumerClient. Jika penyimpanan titik pemeriksaan tidak disediakan, titik pemeriksaan akan dipertahankan secara internal dalam memori.

EventHubConsumerClient juga dapat menerima dari partisi tertentu saat Anda memanggil metodenya receive() atau receive_batch() dan menentukan partition_id. Penyeimbangan beban tidak akan berfungsi dalam mode partisi tunggal. Tetapi pengguna masih dapat menyimpan titik pemeriksaan jika checkpoint_store diatur.

Warisan
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

Konstruktor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parameter

fully_qualified_namespace
str
Diperlukan

Nama host yang sepenuhnya memenuhi syarat untuk namespace layanan Azure Event Hubs. Format namespace adalah: .servicebus.windows.net.

eventhub_name
str
Diperlukan

Jalur Pusat Aktivitas tertentu untuk menyambungkan klien.

consumer_group
str
Diperlukan

Terima peristiwa dari pusat aktivitas untuk grup konsumen ini.

credential
AsyncTokenCredential atau AzureSasCredential atau AzureNamedKeyCredential
Diperlukan

Objek kredensial yang digunakan untuk autentikasi yang mengimplementasikan antarmuka tertentu untuk mendapatkan token. Ini menerima EventHubSharedKeyCredential, atau objek kredensial yang dihasilkan oleh pustaka identitas azure dan objek yang mengimplementasikan metode *get_token(mandiri, cakupan).

logging_enable
bool

Apakah akan mengeluarkan log jejak jaringan ke pencatat. Defaultnya adalah False.

auth_timeout
float

Waktu dalam detik untuk menunggu token diotorisasi oleh layanan. Nilai default-nya adalah 60 detik. Jika diatur ke 0, tidak ada batas waktu yang akan diberlakukan dari klien.

user_agent
str

Jika ditentukan, ini akan ditambahkan di depan string agen pengguna.

retry_total
int

Jumlah total upaya untuk mengulangi operasi yang gagal ketika terjadi kesalahan. Nilai defaultnya adalah 3. Konteks retry_total dalam menerima bersifat khusus: Metode terima diimplementasikan oleh metode penerimaan internal panggilan perulangan sementara di setiap perulangan. Dalam kasus penerimaan , retry_total menentukan jumlah coba lagi setelah kesalahan yang dimunculkan oleh metode penerimaan internal dalam perulangan sementara. Jika upaya coba lagi habis, panggilan balik on_error akan dipanggil (jika disediakan) dengan informasi kesalahan. Konsumen partisi internal yang gagal akan ditutup (on_partition_close akan dipanggil jika disediakan) dan konsumen partisi internal baru akan dibuat (on_partition_initialize akan dipanggil jika disediakan) untuk melanjutkan penerimaan.

retry_backoff_factor
float

Faktor backoff untuk diterapkan di antara upaya setelah percobaan kedua (sebagian besar kesalahan segera diselesaikan dengan percobaan kedua tanpa penundaan). Dalam mode tetap, kebijakan coba lagi akan selalu tidur untuk {faktor backoff}. Dalam mode 'eksponensial', kebijakan percobaan kembali akan tidur untuk: {faktor backoff} * (2 ** ({jumlah total percobaan ulang} - 1)) detik. Jika backoff_factor adalah 0,1, maka percobaan ulang akan tidur selama [0,0 detik, 0,2 detik, 0,4 detik, ...] di antara percobaan ulang. Nilai defaultnya adalah 0,8.

retry_backoff_max
float

Waktu back off maksimum. Nilai defaultnya adalah 120 detik (2 menit).

retry_mode
str

Perilaku penundaan antara upaya coba lagi. Nilai yang didukung adalah 'tetap' atau 'eksponensial', di mana defaultnya adalah 'eksponensial'.

idle_timeout
float

Waktu habis, dalam hitungan detik, setelah itu klien ini akan menutup koneksi yang mendasar jika tidak ada aktivitas lebih lanjut. Secara default nilainya adalah Tidak Ada, yang berarti bahwa klien tidak akan dimatikan karena tidak aktif kecuali dimulai oleh layanan.

transport_type
TransportType

Jenis protokol transportasi yang akan digunakan untuk berkomunikasi dengan layanan Azure Event Hubs. Defaultnya adalah TransportType.Amqp dalam hal ini port 5671 digunakan. Jika port 5671 tidak tersedia/diblokir di lingkungan jaringan, TransportType.AmqpOverWebsocket dapat digunakan sebagai gantinya yang menggunakan port 443 untuk komunikasi.

http_proxy

Pengaturan proksi HTTP. Ini harus berupa kamus dengan kunci berikut: 'proxy_hostname' (nilai str) dan 'proxy_port' (nilai int).

checkpoint_store
Optional[CheckpointStore]

Manajer yang menyimpan data penyeimbangan beban dan titik pemeriksaan partisi saat menerima peristiwa. Penyimpanan titik pemeriksaan akan digunakan dalam kedua kasus penerimaan dari semua partisi atau satu partisi. Dalam kasus terakhir, penyeimbangan beban tidak berlaku. Jika penyimpanan titik pemeriksaan tidak disediakan, titik pemeriksaan akan dipertahankan secara internal dalam memori, dan instans EventHubConsumerClient akan menerima peristiwa tanpa penyeimbangan beban.

load_balancing_interval
float

Saat penyeimbangan beban dimulai. Ini adalah interval, dalam hitungan detik, di antara dua evaluasi penyeimbangan beban. Defaultnya adalah 30 detik.

partition_ownership_expiration_interval
float

Kepemilikan partisi akan kedaluwarsa setelah jumlah detik ini. Setiap evaluasi penyeimbangan beban akan secara otomatis memperpanjang waktu kedaluwarsa kepemilikan. Defaultnya adalah 6 * load_balancing_interval, yaitu 180 detik saat menggunakan load_balancing_interval default 30 detik.

load_balancing_strategy
str atau LoadBalancingStrategy

Ketika penyeimbangan beban dimulai, ia akan menggunakan strategi ini untuk mengklaim dan menyeimbangkan kepemilikan partisi. Gunakan "serakah" atau LoadBalancingStrategy.GREEDY untuk strategi serakah, yang, untuk setiap evaluasi penyeimbangan beban, akan mengambil sebanyak mungkin partisi yang tidak diklaim yang diperlukan untuk menyeimbangkan beban. Gunakan "balanced" atau LoadBalancingStrategy.BALANCED untuk strategi seimbang, yang, untuk setiap evaluasi penyeimbangan beban, hanya mengklaim satu partisi yang tidak diklaim oleh EventHubConsumerClient lainnya. Jika semua partisi EventHub diklaim oleh EventHubConsumerClient lainnya dan klien ini telah mengklaim terlalu sedikit partisi, klien ini akan mencuri satu partisi dari klien lain untuk setiap evaluasi penyeimbangan beban terlepas dari strategi penyeimbangan beban. Strategi serakah digunakan secara default.

custom_endpoint_address
Optional[str]

Alamat titik akhir kustom yang digunakan untuk membuat koneksi ke layanan Azure Event Hubs, memungkinkan permintaan jaringan dirutekan melalui gateway aplikasi atau jalur lain yang diperlukan untuk lingkungan host. Defaultnya adalah Tidak Ada. Formatnya akan seperti "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jika port tidak ditentukan dalam custom_endpoint_address, secara default port 443 akan digunakan.

connection_verify
Optional[str]

Jalur ke file CA_BUNDLE kustom sertifikat SSL yang digunakan untuk mengautentikasi identitas titik akhir koneksi. Defaultnya adalah Tidak Ada dalam hal ini certifi.where() akan digunakan.

uamqp_transport
bool

Apakah akan menggunakan pustaka uamqp sebagai transportasi yang mendasar. Nilai defaultnya adalah False dan pustaka AMQP Python Murni akan digunakan sebagai transportasi yang mendasar.

socket_timeout
float

Waktu dalam detik bahwa soket yang mendasar pada koneksi harus menunggu saat mengirim dan menerima data sebelum waktu habis. Nilai defaultnya adalah 0,2 untuk TransportType.Amqp dan 1 untuk TransportType.AmqpOverWebsocket. Jika kesalahan EventHubsConnectionError terjadi karena waktu penulisan habis, nilai yang lebih besar dari default mungkin perlu diteruskan. Ini untuk skenario penggunaan tingkat lanjut dan biasanya nilai default harus cukup.

Contoh

Buat instans baru EventHubConsumerClient.


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     consumer_group='$Default',
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Metode

close

Berhenti mengambil peristiwa dari Pusat Aktivitas dan tutup koneksi dan tautan AMQP yang mendasar.

from_connection_string

Buat EventHubConsumerClient dari string koneksi.

get_eventhub_properties

Mendapatkan properti Pusat Aktivitas.

Kunci dalam kamus yang dikembalikan meliputi:

  • eventhub_name (str)

  • created_at (datetime.datetime UTC)

  • partition_ids (list[str])

get_partition_ids

Mendapatkan ID partisi dari Pusat Aktivitas.

get_partition_properties

Mendapatkan properti dari partisi yang ditentukan.

Kunci dalam kamus properti meliputi:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (datetime.datetime UTC)

  • is_empty (bool)

receive

Menerima peristiwa dari partisi, dengan penyeimbangan beban opsional dan titik pemeriksaan.

receive_batch

Terima peristiwa dari partisi dalam batch, dengan penyeimbangan beban dan titik pemeriksaan opsional.

close

Berhenti mengambil peristiwa dari Pusat Aktivitas dan tutup koneksi dan tautan AMQP yang mendasar.

async close() -> None

Tipe hasil

Contoh

Tutup klien.


   import os

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub.aio import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

Buat EventHubConsumerClient dari string koneksi.

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient

Parameter

conn_str
str
Diperlukan

String koneksi Pusat Aktivitas.

consumer_group
str
Diperlukan

Menerima peristiwa dari Pusat Aktivitas untuk grup konsumen ini.

eventhub_name
str

Jalur Pusat Aktivitas tertentu untuk menyambungkan klien.

logging_enable
bool

Apakah akan mengeluarkan log jejak jaringan ke pencatat. Defaultnya adalah False.

http_proxy
dict

Pengaturan proksi HTTP. Ini harus berupa kamus dengan kunci berikut: 'proxy_hostname' (nilai str) dan 'proxy_port' (nilai int). Selain itu, kunci berikut mungkin juga ada: 'nama pengguna', 'kata sandi'.

auth_timeout
float

Waktu dalam detik untuk menunggu token diotorisasi oleh layanan. Nilai default-nya adalah 60 detik. Jika diatur ke 0, tidak ada batas waktu yang akan diberlakukan dari klien.

user_agent
str

Jika ditentukan, ini akan ditambahkan di depan string agen pengguna.

retry_total
int

Jumlah total upaya untuk mengulangi operasi yang gagal ketika terjadi kesalahan. Nilai defaultnya adalah 3. Konteks retry_total dalam menerima bersifat khusus: Metode terima diimplementasikan oleh metode penerimaan internal panggilan perulangan sementara di setiap perulangan. Dalam kasus penerimaan , retry_total menentukan jumlah coba lagi setelah kesalahan yang dimunculkan oleh metode penerimaan internal dalam perulangan sementara. Jika upaya coba lagi habis, panggilan balik on_error akan dipanggil (jika disediakan) dengan informasi kesalahan. Konsumen partisi internal yang gagal akan ditutup (on_partition_close akan dipanggil jika disediakan) dan konsumen partisi internal baru akan dibuat (on_partition_initialize akan dipanggil jika disediakan) untuk melanjutkan penerimaan.

retry_backoff_factor
float

Faktor backoff untuk diterapkan di antara upaya setelah percobaan kedua (sebagian besar kesalahan segera diselesaikan dengan percobaan kedua tanpa penundaan). Dalam mode tetap, kebijakan coba lagi akan selalu tidur untuk {faktor backoff}. Dalam mode 'eksponensial', kebijakan percobaan kembali akan tidur untuk: {faktor backoff} * (2 ** ({jumlah total percobaan ulang} - 1)) detik. Jika backoff_factor adalah 0,1, maka percobaan ulang akan tidur selama [0,0 detik, 0,2 detik, 0,4 detik, ...] di antara percobaan ulang. Nilai defaultnya adalah 0,8.

retry_backoff_max
float

Waktu back off maksimum. Nilai defaultnya adalah 120 detik (2 menit).

retry_mode
str

Perilaku penundaan antara upaya coba lagi. Nilai yang didukung adalah 'tetap' atau 'eksponensial', di mana defaultnya adalah 'eksponensial'.

idle_timeout
float

Waktu habis, dalam hitungan detik, setelah itu klien ini akan menutup koneksi yang mendasar jika tidak ada aktivitas lebih lanjut. Secara default nilainya adalah Tidak Ada, yang berarti bahwa klien tidak akan dimatikan karena tidak aktif kecuali dimulai oleh layanan.

transport_type
TransportType

Jenis protokol transportasi yang akan digunakan untuk berkomunikasi dengan layanan Azure Event Hubs. Defaultnya adalah TransportType.Amqp dalam hal ini port 5671 digunakan. Jika port 5671 tidak tersedia/diblokir di lingkungan jaringan, TransportType.AmqpOverWebsocket dapat digunakan sebagai gantinya yang menggunakan port 443 untuk komunikasi.

checkpoint_store
Optional[CheckpointStore]

Manajer yang menyimpan data penyeimbangan beban dan titik pemeriksaan partisi saat menerima peristiwa. Penyimpanan titik pemeriksaan akan digunakan dalam kedua kasus penerimaan dari semua partisi atau satu partisi. Dalam kasus terakhir, penyeimbangan beban tidak berlaku. Jika penyimpanan titik pemeriksaan tidak disediakan, titik pemeriksaan akan dipertahankan secara internal dalam memori, dan instans EventHubConsumerClient akan menerima peristiwa tanpa penyeimbangan beban.

load_balancing_interval
float

Saat penyeimbangan beban dimulai. Ini adalah interval, dalam hitungan detik, di antara dua evaluasi penyeimbangan beban. Defaultnya adalah 30 detik.

partition_ownership_expiration_interval
float

Kepemilikan partisi akan kedaluwarsa setelah jumlah detik ini. Setiap evaluasi penyeimbangan beban akan secara otomatis memperpanjang waktu kedaluwarsa kepemilikan. Defaultnya adalah 6 * load_balancing_interval, yaitu 180 detik saat menggunakan load_balancing_interval default 30 detik.

load_balancing_strategy
str atau LoadBalancingStrategy

Ketika penyeimbangan beban dimulai, ia akan menggunakan strategi ini untuk mengklaim dan menyeimbangkan kepemilikan partisi. Gunakan "serakah" atau LoadBalancingStrategy.GREEDY untuk strategi serakah, yang, untuk setiap evaluasi penyeimbangan beban, akan mengambil sebanyak mungkin partisi yang tidak diklaim yang diperlukan untuk menyeimbangkan beban. Gunakan "balanced" atau LoadBalancingStrategy.BALANCED untuk strategi seimbang, yang, untuk setiap evaluasi penyeimbangan beban, hanya mengklaim satu partisi yang tidak diklaim oleh EventHubConsumerClient lainnya. Jika semua partisi EventHub diklaim oleh EventHubConsumerClient lainnya dan klien ini telah mengklaim terlalu sedikit partisi, klien ini akan mencuri satu partisi dari klien lain untuk setiap evaluasi penyeimbangan beban terlepas dari strategi penyeimbangan beban. Strategi serakah digunakan secara default.

custom_endpoint_address
Optional[str]

Alamat titik akhir kustom yang digunakan untuk membuat koneksi ke layanan Azure Event Hubs, memungkinkan permintaan jaringan dirutekan melalui gateway aplikasi atau jalur lain yang diperlukan untuk lingkungan host. Defaultnya adalah Tidak Ada. Formatnya akan seperti "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jika port tidak ditentukan dalam custom_endpoint_address, secara default port 443 akan digunakan.

connection_verify
Optional[str]

Jalur ke file CA_BUNDLE kustom sertifikat SSL yang digunakan untuk mengautentikasi identitas titik akhir koneksi. Defaultnya adalah Tidak Ada dalam hal ini certifi.where() akan digunakan.

uamqp_transport
bool

Apakah akan menggunakan pustaka uamqp sebagai transportasi yang mendasar. Nilai defaultnya adalah False dan pustaka AMQP Python Murni akan digunakan sebagai transportasi yang mendasar.

Tipe hasil

Contoh

Buat instans baru EventHubConsumerClient dari string koneksi.


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Mendapatkan properti Pusat Aktivitas.

Kunci dalam kamus yang dikembalikan meliputi:

  • eventhub_name (str)

  • created_at (datetime.datetime UTC)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Mengembalikan

Kamus yang berisi informasi tentang Pusat Aktivitas.

Tipe hasil

Pengecualian

get_partition_ids

Mendapatkan ID partisi dari Pusat Aktivitas.

async get_partition_ids() -> List[str]

Mengembalikan

Daftar ID partisi.

Tipe hasil

Pengecualian

get_partition_properties

Mendapatkan properti dari partisi yang ditentukan.

Kunci dalam kamus properti meliputi:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (datetime.datetime UTC)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parameter

partition_id
str
Diperlukan

ID partisi target.

Mengembalikan

Kamus yang berisi properti partisi.

Tipe hasil

Pengecualian

receive

Menerima peristiwa dari partisi, dengan penyeimbangan beban opsional dan titik pemeriksaan.

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parameter

on_event
Callable[PartitionContext, Optional[EventData]]
Diperlukan

Fungsi panggilan balik untuk menangani peristiwa yang diterima. Panggilan balik mengambil dua parameter: partition_context yang berisi konteks partisi dan peristiwa yang merupakan peristiwa yang diterima. Fungsi panggilan balik harus didefinisikan seperti: on_event(partition_context, peristiwa). Untuk informasi konteks partisi terperinci, silakan merujuk ke PartitionContext.

max_wait_time
float

Interval maksimum dalam detik yang akan ditunda oleh prosesor peristiwa sebelum memanggil panggilan balik. Jika tidak ada peristiwa yang diterima dalam interval ini, panggilan balik on_event akan dipanggil dengan Tidak Ada. Jika nilai ini diatur ke Tidak Ada atau 0 (default), panggilan balik tidak akan dipanggil hingga peristiwa diterima.

partition_id
str

Jika ditentukan, klien hanya akan menerima dari partisi ini. Jika tidak, klien akan menerima dari semua partisi.

owner_level
int

Prioritas untuk konsumen eksklusif. Konsumen eksklusif akan dibuat jika owner_level ditetapkan. Konsumen dengan owner_level yang lebih tinggi memiliki prioritas eksklusif yang lebih tinggi. Tingkat pemilik juga dikenal sebagai 'nilai masa' konsumen.

prefetch
int

Jumlah peristiwa yang akan diambil sebelumnya dari layanan untuk diproses. Defaultnya adalah 300.

track_last_enqueued_event_properties
bool

Menunjukkan apakah konsumen harus meminta informasi tentang peristiwa antrean terakhir pada partisi terkait, dan melacak informasi tersebut saat peristiwa diterima. Ketika informasi tentang peristiwa antrean terakhir partisi sedang dilacak, setiap peristiwa yang diterima dari layanan Azure Event Hubs akan membawa metadata tentang partisi. Ini menghasilkan sejumlah kecil konsumsi bandwidth jaringan tambahan yang umumnya merupakan trade-off yang menguntungkan ketika dipertimbangkan untuk secara berkala membuat permintaan untuk properti partisi menggunakan klien Event Hub. Ini diatur ke False secara default.

starting_position
str, int, datetime atau dict[str,any]

Mulai menerima dari posisi peristiwa ini jika tidak ada data titik pemeriksaan untuk partisi. Data titik pemeriksaan akan digunakan jika tersedia. Ini bisa menjadi dict dengan ID partisi sebagai kunci dan posisi sebagai nilai untuk partisi individual, atau nilai tunggal untuk semua partisi. Jenis nilai dapat berupa str, int, atau datetime.datetime. Juga didukung adalah nilai "-1" untuk menerima dari awal aliran, dan "@latest" untuk hanya menerima peristiwa baru.

starting_position_inclusive
bool atau dict[str,bool]

Tentukan apakah starting_position yang diberikan inklusif(>=) atau tidak (>). Berlaku untuk inklusif dan False untuk eksklusif. Ini bisa menjadi dict dengan ID partisi sebagai kunci dan bool sebagai nilai yang menunjukkan apakah starting_position untuk partisi tertentu inklusif atau tidak. Ini juga bisa menjadi nilai bool tunggal untuk semua starting_position. Nilai defaultnya adalah False.

on_error
Callable[[PartitionContext, Exception]]

Fungsi panggilan balik yang akan dipanggil ketika kesalahan dinaikkan selama penerimaan setelah upaya coba lagi habis, atau selama proses penyeimbangan beban. Panggilan balik mengambil dua parameter: partition_context yang berisi informasi partisi dan kesalahan menjadi pengecualian. partition_context bisa tidak ada jika kesalahan dimunculkan selama proses penyeimbangan beban. Panggilan balik harus didefinisikan seperti: on_error(partition_context, kesalahan). Panggilan balik on_error juga akan dipanggil jika pengecualian yang tidak tertangani dinaikkan selama panggilan balik on_event .

on_partition_initialize
Callable[[PartitionContext]]

Fungsi panggilan balik yang akan dipanggil setelah konsumen untuk partisi tertentu menyelesaikan inisialisasi. Ini juga akan dipanggil ketika konsumen partisi internal baru dibuat untuk mengambil alih proses penerimaan untuk konsumen partisi internal yang gagal dan tertutup. Panggilan balik mengambil satu parameter: partition_context yang berisi informasi partisi. Panggilan balik harus didefinisikan seperti: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Fungsi panggilan balik yang akan dipanggil setelah konsumen untuk partisi tertentu ditutup. Ini juga akan dipanggil ketika kesalahan dimunculkan selama penerimaan setelah upaya coba lagi habis. Panggilan balik mengambil dua parameter: partition_context yang berisi informasi partisi dan alasan penutupan. Panggilan balik harus didefinisikan seperti: on_partition_close(partition_context, alasan). Silakan merujuk ke CloseReason untuk berbagai alasan penutupan.

Tipe hasil

Contoh

Menerima peristiwa dari EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

Terima peristiwa dari partisi dalam batch, dengan penyeimbangan beban dan titik pemeriksaan opsional.

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parameter

on_event_batch
Callable[PartitionContext, List[EventData]]
Diperlukan

Fungsi panggilan balik untuk menangani batch peristiwa yang diterima. Panggilan balik mengambil dua parameter: partition_context yang berisi konteks partisi dan event_batch, yang merupakan peristiwa yang diterima. Fungsi panggilan balik harus didefinisikan seperti: on_event_batch(partition_context, event_batch). event_batch bisa menjadi daftar kosong jika max_wait_time bukan Tidak Ada atau 0 dan tidak ada peristiwa yang diterima setelah max_wait_time. Untuk informasi konteks partisi terperinci, silakan lihat PartitionContext.

max_batch_size
int

Jumlah maksimum peristiwa dalam batch yang diteruskan ke panggilan balik on_event_batch. Jika jumlah peristiwa yang diterima aktual lebih besar dari max_batch_size, peristiwa yang diterima dibagi menjadi batch dan memanggil panggilan balik untuk setiap batch hingga max_batch_size peristiwa.

max_wait_time
float

Interval maksimum dalam detik yang akan ditunda oleh prosesor peristiwa sebelum memanggil panggilan balik. Jika tidak ada peristiwa yang diterima dalam interval ini, panggilan balik on_event_batch akan dipanggil dengan daftar kosong. Jika nilai ini diatur ke Tidak Ada atau 0 (default), panggilan balik tidak akan dipanggil sampai peristiwa diterima.

partition_id
str

Jika ditentukan, klien hanya akan menerima dari partisi ini. Jika tidak, klien akan menerima dari semua partisi.

owner_level
int

Prioritas untuk konsumen eksklusif. Konsumen eksklusif akan dibuat jika owner_level ditetapkan. Konsumen dengan owner_level yang lebih tinggi memiliki prioritas eksklusif yang lebih tinggi. Tingkat pemilik juga dikenal sebagai 'nilai epoch' konsumen.

prefetch
int

Jumlah peristiwa yang diambil sebelumnya dari layanan untuk diproses. Defaultnya adalah 300.

track_last_enqueued_event_properties
bool

Menunjukkan apakah konsumen harus meminta informasi tentang peristiwa antrean terakhir pada partisi terkait, dan melacak informasi tersebut saat peristiwa diterima. Ketika informasi tentang peristiwa antrean terakhir partisi sedang dilacak, setiap peristiwa yang diterima dari layanan Azure Event Hubs akan membawa metadata tentang partisi. Ini menghasilkan sejumlah kecil konsumsi bandwidth jaringan tambahan yang umumnya merupakan trade-off yang menguntungkan ketika dipertimbangkan untuk secara berkala membuat permintaan untuk properti partisi menggunakan klien Event Hub. Ini diatur ke False secara default.

starting_position
str, int, datetime atau dict[str,any]

Mulai menerima dari posisi kejadian ini jika tidak ada data titik pemeriksaan untuk partisi. Data titik pemeriksaan akan digunakan jika tersedia. Ini bisa menjadi dict dengan ID partisi sebagai kunci dan posisi sebagai nilai untuk partisi individual, atau nilai tunggal untuk semua partisi. Jenis nilai dapat berupa str, int, atau datetime.datetime. Juga didukung adalah nilai "-1" untuk menerima dari awal aliran, dan "@latest" untuk hanya menerima peristiwa baru.

starting_position_inclusive
bool atau dict[str,bool]

Tentukan apakah starting_position yang diberikan inklusif(>=) atau tidak (>). Berlaku untuk inklusif dan False untuk eksklusif. Ini dapat menjadi dict dengan ID partisi sebagai kunci dan bool sebagai nilai yang menunjukkan apakah starting_position untuk partisi tertentu inklusif atau tidak. Ini juga bisa menjadi nilai bool tunggal untuk semua starting_position. Nilai defaultnya adalah False.

on_error
Callable[[PartitionContext, Exception]]

Fungsi panggilan balik yang akan dipanggil ketika kesalahan dinaikkan selama penerimaan setelah upaya coba lagi habis, atau selama proses penyeimbangan beban. Panggilan balik mengambil dua parameter: partition_context yang berisi informasi partisi dan kesalahan menjadi pengecualian. partition_context bisa tidak ada jika kesalahan dimunculkan selama proses penyeimbangan beban. Panggilan balik harus didefinisikan seperti: on_error(partition_context, kesalahan). Panggilan balik on_error juga akan dipanggil jika pengecualian yang tidak tertangani dinaikkan selama panggilan balik on_event .

on_partition_initialize
Callable[[PartitionContext]]

Fungsi panggilan balik yang akan dipanggil setelah konsumen untuk partisi tertentu menyelesaikan inisialisasi. Ini juga akan dipanggil ketika konsumen partisi internal baru dibuat untuk mengambil alih proses penerimaan untuk konsumen partisi internal yang gagal dan tertutup. Panggilan balik mengambil satu parameter: partition_context yang berisi informasi partisi. Panggilan balik harus didefinisikan seperti: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Fungsi panggilan balik yang akan dipanggil setelah konsumen untuk partisi tertentu ditutup. Ini juga akan dipanggil ketika kesalahan dimunculkan selama penerimaan setelah upaya coba lagi habis. Panggilan balik mengambil dua parameter: partition_context yang berisi informasi partisi dan alasan penutupan. Panggilan balik harus didefinisikan seperti: on_partition_close(partition_context, alasan). Silakan merujuk ke CloseReason untuk berbagai alasan penutupan.

Tipe hasil

Contoh

Menerima peristiwa dalam batch dari EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )