EventHubConsumerClient Kelas
Kelas EventHubConsumerClient mendefinisikan antarmuka tingkat tinggi untuk menerima peristiwa dari layanan Azure Event Hubs.
Tujuan utama EventHubConsumerClient adalah untuk menerima peristiwa dari semua partisi EventHub dengan penyeimbangan beban dan titik pemeriksaan.
Ketika beberapa instans EventHubConsumerClient berjalan terhadap hub peristiwa yang sama, grup konsumen, dan lokasi titik pemeriksaan, partisi akan didistribusikan secara merata di antara mereka.
Untuk mengaktifkan penyeimbangan beban dan titik pemeriksaan yang bertahan, checkpoint_store harus diatur saat membuat EventHubConsumerClient. Jika penyimpanan titik pemeriksaan tidak disediakan, titik pemeriksaan akan dipertahankan secara internal dalam memori.
EventHubConsumerClient juga dapat menerima dari partisi tertentu saat Anda memanggil metodenya receive() atau receive_batch() dan menentukan partition_id. Penyeimbangan beban tidak akan berfungsi dalam mode partisi tunggal. Tetapi pengguna masih dapat menyimpan titik pemeriksaan jika checkpoint_store diatur.
- Warisan
-
azure.eventhub._client_base.ClientBaseEventHubConsumerClient
Konstruktor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parameter
- fully_qualified_namespace
- str
Nama host yang sepenuhnya memenuhi syarat untuk namespace Layanan Pusat Aktivitas. Format namespace adalah: .servicebus.windows.net.
- credential
- TokenCredential atau AzureSasCredential atau AzureNamedKeyCredential
Objek kredensial yang digunakan untuk autentikasi yang mengimplementasikan antarmuka tertentu untuk mendapatkan token. Ini menerima EventHubSharedKeyCredentialobjek kredensial , atau yang dihasilkan oleh pustaka identitas azure dan objek yang mengimplementasikan metode *get_token(mandiri, cakupan).
- logging_enable
- bool
Apakah akan mengeluarkan log jejak jaringan ke pencatat. Defaultnya adalah False.
- auth_timeout
- float
Waktu dalam detik untuk menunggu token diotorisasi oleh layanan. Nilai default-nya adalah 60 detik. Jika diatur ke 0, tidak ada batas waktu yang akan diberlakukan dari klien.
- user_agent
- str
Jika ditentukan, ini akan ditambahkan di depan string agen pengguna.
- retry_total
- int
Jumlah total upaya untuk mengulangi operasi yang gagal ketika terjadi kesalahan. Nilai defaultnya adalah 3. Konteks retry_total dalam menerima adalah khusus: Metode penerimaan diimplementasikan oleh metode penerimaan internal panggilan sementara dalam setiap iterasi. Dalam kasus penerimaan , retry_total menentukan jumlah coba lagi setelah kesalahan yang dimunculkan oleh metode penerimaan internal dalam perulangan sementara. Jika upaya coba lagi habis, panggilan balik on_error akan dipanggil (jika disediakan) dengan informasi kesalahan. Konsumen partisi internal yang gagal akan ditutup (on_partition_close akan dipanggil jika disediakan) dan konsumen partisi internal baru akan dibuat (on_partition_initialize akan dipanggil jika disediakan) untuk melanjutkan penerimaan.
- retry_backoff_factor
- float
Faktor backoff untuk diterapkan di antara upaya setelah percobaan kedua (sebagian besar kesalahan segera diselesaikan dengan percobaan kedua tanpa penundaan). Dalam mode tetap, kebijakan coba lagi akan selalu tidur untuk {faktor backoff}. Dalam mode 'eksponensial', kebijakan percobaan kembali akan tidur untuk: {faktor backoff} * (2 ** ({jumlah total percobaan ulang} - 1)) detik. Jika backoff_factor adalah 0,1, maka coba lagi akan tidur selama [0.0s, 0.2s, 0.4s, ...] di antara percobaan ulang. Nilai defaultnya adalah 0,8.
- retry_backoff_max
- float
Waktu back off maksimum. Nilai defaultnya adalah 120 detik (2 menit).
- retry_mode
- str
Perilaku penundaan antara upaya coba lagi. Nilai yang didukung adalah 'tetap' atau 'eksponensial', di mana defaultnya adalah 'eksponensial'.
- idle_timeout
- float
Waktu habis, dalam hitungan detik, setelah itu klien ini akan menutup koneksi yang mendasar jika tidak ada aktivitas lebih lanjut. Secara default nilainya adalah Tidak Ada, yang berarti bahwa klien tidak akan dimatikan karena tidak aktif kecuali dimulai oleh layanan.
- transport_type
- TransportType
Jenis protokol transportasi yang akan digunakan untuk berkomunikasi dengan layanan Azure Event Hubs. Defaultnya adalah TransportType.Amqp dalam hal ini port 5671 digunakan. Jika port 5671 tidak tersedia/diblokir di lingkungan jaringan, TransportType.AmqpOverWebsocket dapat digunakan sebagai gantinya yang menggunakan port 443 untuk komunikasi.
Pengaturan proksi HTTP. Ini harus berupa kamus dengan kunci berikut: 'proxy_hostname' (nilai str) dan 'proxy_port' (nilai int). Selain itu, kunci berikut mungkin juga ada: 'nama pengguna', 'kata sandi'.
- checkpoint_store
- CheckpointStore atau None
Manajer yang menyimpan data penyeimbangan beban dan titik pemeriksaan partisi saat menerima peristiwa. Penyimpanan titik pemeriksaan akan digunakan dalam kedua kasus penerimaan dari semua partisi atau satu partisi. Dalam kasus terakhir penyeimbangan beban tidak berlaku. Jika penyimpanan titik pemeriksaan tidak disediakan, titik pemeriksaan akan dipertahankan secara internal dalam memori, dan instans EventHubConsumerClient akan menerima peristiwa tanpa penyeimbangan beban.
- load_balancing_interval
- float
Ketika penyeimbangan beban menendang masuk Ini adalah interval, dalam hitungan detik, di antara dua evaluasi penyeimbangan beban. Defaultnya adalah 30 detik.
- partition_ownership_expiration_interval
- float
Kepemilikan partisi akan kedaluwarsa setelah jumlah detik ini. Setiap evaluasi penyeimbangan beban akan secara otomatis memperpanjang waktu kedaluwarsa kepemilikan. Defaultnya adalah 6 * load_balancing_interval, yaitu 180 detik saat menggunakan load_balancing_interval default 30 detik.
- load_balancing_strategy
- str atau LoadBalancingStrategy
Ketika penyeimbangan beban dimulai, ia akan menggunakan strategi ini untuk mengklaim dan menyeimbangkan kepemilikan partisi. Gunakan "greedy" atau LoadBalancingStrategy.GREEDY untuk strategi serakah, yang, untuk setiap evaluasi penyeimbangan beban, akan mengambil partisi yang tidak diklaim sebanyak yang diperlukan untuk menyeimbangkan beban. Gunakan "seimbang" atau LoadBalancingStrategy.BALANCED untuk strategi seimbang, yang, untuk setiap evaluasi penyeimbangan beban, hanya mengklaim satu partisi yang tidak diklaim oleh EventHubConsumerClient lainnya. Jika semua partisi EventHub diklaim oleh EventHubConsumerClient lainnya dan klien ini telah mengklaim terlalu sedikit partisi, klien ini akan mencuri satu partisi dari klien lain untuk setiap evaluasi penyeimbangan beban terlepas dari strategi penyeimbangan beban. Strategi serakah digunakan secara default.
Alamat titik akhir kustom yang digunakan untuk membuat koneksi ke layanan Azure Event Hubs, memungkinkan permintaan jaringan dirutekan melalui gateway aplikasi atau jalur lain yang diperlukan untuk lingkungan host. Defaultnya adalah Tidak Ada. Formatnya seperti "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jika port tidak ditentukan dalam custom_endpoint_address, secara default port 443 akan digunakan.
Jalur ke file CA_BUNDLE kustom sertifikat SSL yang digunakan untuk mengautentikasi identitas titik akhir koneksi. Defaultnya adalah Tidak Ada dalam hal ini certifi.where() akan digunakan.
- uamqp_transport
- bool
Apakah akan menggunakan pustaka uamqp sebagai transportasi yang mendasar. Nilai defaultnya adalah False dan pustaka AMQP Python Murni akan digunakan sebagai transportasi yang mendasar.
- socket_timeout
- float
Waktu dalam detik bahwa soket yang mendasar pada koneksi harus menunggu saat mengirim dan menerima data sebelum waktu habis. Nilai defaultnya adalah 0,2 untuk TransportType.Amqp dan 1 untuk TransportType.AmqpOverWebsocket. Jika kesalahan EventHubsConnectionError terjadi karena waktu penulisan habis, nilai yang lebih besar dari default mungkin perlu diteruskan. Ini untuk skenario penggunaan tingkat lanjut dan biasanya nilai default harus cukup.
Contoh
Buat instans baru EventHubConsumerClient.
import os
from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
consumer = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group='$Default',
credential=credential)
Metode
close |
Berhenti mengambil peristiwa dari Pusat Aktivitas dan tutup koneksi dan tautan AMQP yang mendasar. |
from_connection_string |
Buat EventHubConsumerClient dari string koneksi. |
get_eventhub_properties |
Mendapatkan properti Pusat Aktivitas. Kunci dalam kamus yang dikembalikan meliputi:
|
get_partition_ids |
Mendapatkan ID partisi dari Pusat Aktivitas. |
get_partition_properties |
Mendapatkan properti dari partisi yang ditentukan. Kunci dalam kamus properti meliputi:
|
receive |
Menerima peristiwa dari partisi, dengan penyeimbangan beban opsional dan titik pemeriksaan. |
receive_batch |
Menerima peristiwa dari partisi, dengan penyeimbangan beban opsional dan titik pemeriksaan. |
close
Berhenti mengambil peristiwa dari Pusat Aktivitas dan tutup koneksi dan tautan AMQP yang mendasar.
close() -> None
Tipe hasil
Contoh
Tutup klien.
import os
import threading
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group="$Default",
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The 'receive' method is a blocking call, it can be executed in a thread for
# non-blocking behavior, and combined with the 'close' method.
worker = threading.Thread(
target=consumer.receive,
kwargs={
"on_event": on_event,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
worker.start()
time.sleep(10) # Keep receiving for 10s then close.
# Close down the consumer handler explicitly.
consumer.close()
from_connection_string
Buat EventHubConsumerClient dari string koneksi.
from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient
Parameter
- eventhub_name
- str
Jalur Event Hub tertentu untuk menyambungkan klien.
- logging_enable
- bool
Apakah akan mengeluarkan log jejak jaringan ke pencatat. Defaultnya adalah False.
- auth_timeout
- float
Waktu dalam detik untuk menunggu token diotorisasi oleh layanan. Nilai default-nya adalah 60 detik. Jika diatur ke 0, tidak ada batas waktu yang akan diberlakukan dari klien.
- user_agent
- str
Jika ditentukan, ini akan ditambahkan di depan string agen pengguna.
- retry_total
- int
Jumlah total upaya untuk mengulangi operasi yang gagal ketika terjadi kesalahan. Nilai defaultnya adalah 3. Konteks retry_total dalam menerima adalah khusus: Metode penerimaan diimplementasikan oleh metode penerimaan internal panggilan sementara dalam setiap iterasi. Dalam kasus penerimaan , retry_total menentukan jumlah coba lagi setelah kesalahan yang dimunculkan oleh metode penerimaan internal dalam perulangan sementara. Jika upaya coba lagi habis, panggilan balik on_error akan dipanggil (jika disediakan) dengan informasi kesalahan. Konsumen partisi internal yang gagal akan ditutup (on_partition_close akan dipanggil jika disediakan) dan konsumen partisi internal baru akan dibuat (on_partition_initialize akan dipanggil jika disediakan) untuk melanjutkan penerimaan.
- retry_backoff_factor
- float
Faktor backoff untuk diterapkan di antara upaya setelah percobaan kedua (sebagian besar kesalahan segera diselesaikan dengan percobaan kedua tanpa penundaan). Dalam mode tetap, kebijakan coba lagi akan selalu tidur untuk {faktor backoff}. Dalam mode 'eksponensial', kebijakan percobaan kembali akan tidur untuk: {faktor backoff} * (2 ** ({jumlah total percobaan ulang} - 1)) detik. Jika backoff_factor adalah 0,1, maka percobaan ulang akan tidur selama [0,0 detik, 0,2 detik, 0,4 detik, ...] di antara percobaan ulang. Nilai defaultnya adalah 0,8.
- retry_backoff_max
- float
Waktu back off maksimum. Nilai defaultnya adalah 120 detik (2 menit).
- retry_mode
- str
Perilaku penundaan antara upaya coba lagi. Nilai yang didukung adalah 'tetap' atau 'eksponensial', di mana defaultnya adalah 'eksponensial'.
- idle_timeout
- float
Waktu habis, dalam hitungan detik, setelah itu klien ini akan menutup koneksi yang mendasar jika tidak ada aktivitas berbulu. Secara default nilainya adalah Tidak Ada, yang berarti bahwa klien tidak akan dimatikan karena tidak aktif kecuali dimulai oleh layanan.
- transport_type
- TransportType
Jenis protokol transportasi yang akan digunakan untuk berkomunikasi dengan layanan Azure Event Hubs. Defaultnya adalah TransportType.Amqp dalam hal ini port 5671 digunakan. Jika port 5671 tidak tersedia/diblokir di lingkungan jaringan, TransportType.AmqpOverWebsocket dapat digunakan sebagai gantinya yang menggunakan port 443 untuk komunikasi.
- http_proxy
- dict
Pengaturan proksi HTTP. Ini harus berupa kamus dengan kunci berikut: 'proxy_hostname' (nilai str) dan 'proxy_port' (nilai int). Selain itu, kunci berikut mungkin juga ada: 'nama pengguna', 'kata sandi'.
- checkpoint_store
- CheckpointStore atau None
Manajer yang menyimpan data penyeimbangan beban dan titik pemeriksaan partisi saat menerima peristiwa. Penyimpanan titik pemeriksaan akan digunakan dalam kedua kasus penerimaan dari semua partisi atau satu partisi. Dalam kasus terakhir, penyeimbangan beban tidak berlaku. Jika penyimpanan titik pemeriksaan tidak disediakan, titik pemeriksaan akan dipertahankan secara internal dalam memori, dan instans EventHubConsumerClient akan menerima peristiwa tanpa penyeimbangan beban.
- load_balancing_interval
- float
Saat penyeimbangan beban dimulai. Ini adalah interval, dalam hitungan detik, di antara dua evaluasi penyeimbangan beban. Defaultnya adalah 10 detik.
- partition_ownership_expiration_interval
- float
Kepemilikan partisi akan kedaluwarsa setelah jumlah detik ini. Setiap evaluasi penyeimbangan beban akan secara otomatis memperpanjang waktu kedaluwarsa kepemilikan. Defaultnya adalah 6 * load_balancing_interval, yaitu 60 detik saat menggunakan load_balancing_interval default 30 detik.
- load_balancing_strategy
- str atau LoadBalancingStrategy
Ketika penyeimbangan beban dimulai, ia akan menggunakan strategi ini untuk mengklaim dan menyeimbangkan kepemilikan partisi. Gunakan "serakah" atau LoadBalancingStrategy.GREEDY untuk strategi serakah, yang, untuk setiap evaluasi penyeimbangan beban, akan mengambil sebanyak mungkin partisi yang tidak diklaim yang diperlukan untuk menyeimbangkan beban. Gunakan "balanced" atau LoadBalancingStrategy.BALANCED untuk strategi seimbang, yang, untuk setiap evaluasi penyeimbangan beban, hanya mengklaim satu partisi yang tidak diklaim oleh EventHubConsumerClient lainnya. Jika semua partisi EventHub diklaim oleh EventHubConsumerClient lainnya dan klien ini telah mengklaim terlalu sedikit partisi, klien ini akan mencuri satu partisi dari klien lain untuk setiap evaluasi penyeimbangan beban terlepas dari strategi penyeimbangan beban. Strategi serakah digunakan secara default.
Alamat titik akhir kustom yang digunakan untuk membuat koneksi ke layanan Azure Event Hubs, memungkinkan permintaan jaringan dirutekan melalui gateway aplikasi atau jalur lain yang diperlukan untuk lingkungan host. Defaultnya adalah Tidak Ada. Formatnya akan seperti "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jika port tidak ditentukan dalam custom_endpoint_address, secara default port 443 akan digunakan.
Jalur ke file CA_BUNDLE kustom sertifikat SSL yang digunakan untuk mengautentikasi identitas titik akhir koneksi. Defaultnya adalah Tidak Ada dalam hal ini certifi.where() akan digunakan.
- uamqp_transport
- bool
Apakah akan menggunakan pustaka uamqp sebagai transportasi yang mendasar. Nilai defaultnya adalah False dan pustaka AMQP Python Murni akan digunakan sebagai transportasi yang mendasar.
Tipe hasil
Contoh
Buat instans baru EventHubConsumerClient dari string koneksi.
import os
from azure.eventhub import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Mendapatkan properti Pusat Aktivitas.
Kunci dalam kamus yang dikembalikan meliputi:
eventhub_name (str)
created_at (datetime.datetime UTC)
partition_ids (list[str])
get_eventhub_properties() -> Dict[str, Any]
Mengembalikan
Kamus yang berisi informasi tentang Pusat Aktivitas.
Tipe hasil
Pengecualian
get_partition_ids
Mendapatkan ID partisi dari Pusat Aktivitas.
get_partition_ids() -> List[str]
Mengembalikan
Daftar ID partisi.
Tipe hasil
Pengecualian
get_partition_properties
Mendapatkan properti dari partisi yang ditentukan.
Kunci dalam kamus properti meliputi:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (datetime.datetime UTC)
is_empty (bool)
get_partition_properties(partition_id: str) -> Dict[str, Any]
Parameter
Mengembalikan
Kamus yang berisi properti partisi.
Tipe hasil
Pengecualian
receive
Menerima peristiwa dari partisi, dengan penyeimbangan beban opsional dan titik pemeriksaan.
receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None
Parameter
- on_event
- callable[PartitionContext, EventData atau None]
Fungsi panggilan balik untuk menangani peristiwa yang diterima. Panggilan balik mengambil dua parameter: partition_context yang berisi konteks partisi dan peristiwa yang merupakan peristiwa yang diterima. Fungsi panggilan balik harus didefinisikan seperti: on_event(partition_context, peristiwa). Untuk informasi konteks partisi terperinci, silakan merujuk ke PartitionContext.
- max_wait_time
- float
Interval maksimum dalam detik yang akan ditunda oleh prosesor peristiwa sebelum memanggil panggilan balik. Jika tidak ada peristiwa yang diterima dalam interval ini, panggilan balik on_event akan dipanggil dengan Tidak Ada. Jika nilai ini diatur ke Tidak Ada atau 0 (default), panggilan balik tidak akan dipanggil hingga peristiwa diterima.
- partition_id
- str
Jika ditentukan, klien hanya akan menerima dari partisi ini. Jika tidak, klien akan menerima dari semua partisi.
- owner_level
- int
Prioritas untuk konsumen eksklusif. Konsumen eksklusif akan dibuat jika owner_level ditetapkan. Konsumen dengan owner_level yang lebih tinggi memiliki prioritas eksklusif yang lebih tinggi. Tingkat pemilik juga dikenal sebagai 'nilai masa' konsumen.
- prefetch
- int
Jumlah peristiwa yang akan diambil sebelumnya dari layanan untuk diproses. Defaultnya adalah 300.
- track_last_enqueued_event_properties
- bool
Menunjukkan apakah konsumen harus meminta informasi tentang peristiwa antrean terakhir pada partisi terkait, dan melacak informasi tersebut saat peristiwa diterima. Ketika informasi tentang peristiwa antrean terakhir partisi sedang dilacak, setiap peristiwa yang diterima dari layanan Azure Event Hubs akan membawa metadata tentang partisi. Ini menghasilkan sejumlah kecil konsumsi bandwidth jaringan tambahan yang umumnya merupakan trade-off yang menguntungkan ketika dipertimbangkan untuk secara berkala membuat permintaan untuk properti partisi menggunakan klien Event Hub. Ini diatur ke False secara default.
Mulai menerima dari posisi peristiwa ini jika tidak ada data titik pemeriksaan untuk partisi. Data titik pemeriksaan akan digunakan jika tersedia. Ini bisa menjadi dict dengan ID partisi sebagai kunci dan posisi sebagai nilai untuk partisi individual, atau nilai tunggal untuk semua partisi. Jenis nilai dapat berupa str, int, atau datetime.datetime. Juga didukung adalah nilai "-1" untuk menerima dari awal aliran, dan "@latest" untuk hanya menerima peristiwa baru. Nilai defaultnya adalah "@latest".
Tentukan apakah starting_position yang diberikan inklusif(>=) atau tidak (>). Berlaku untuk inklusif dan False untuk eksklusif. Ini bisa menjadi dict dengan ID partisi sebagai kunci dan bool sebagai nilai yang menunjukkan apakah starting_position untuk partisi tertentu inklusif atau tidak. Ini juga bisa menjadi nilai bool tunggal untuk semua starting_position. Nilai defaultnya adalah False.
- on_error
- callable[[PartitionContext, Exception]]
Fungsi panggilan balik yang akan dipanggil ketika kesalahan dinaikkan selama penerimaan setelah upaya coba lagi habis, atau selama proses penyeimbangan beban. Panggilan balik mengambil dua parameter: partition_context yang berisi informasi partisi dan kesalahan menjadi pengecualian. partition_context bisa tidak ada jika kesalahan dimunculkan selama proses penyeimbangan beban. Panggilan balik harus didefinisikan seperti: on_error(partition_context, kesalahan). Panggilan balik on_error juga akan dipanggil jika pengecualian yang tidak tertangani dinaikkan selama panggilan balik on_event .
- on_partition_initialize
- callable[[PartitionContext]]
Fungsi panggilan balik yang akan dipanggil setelah konsumen untuk partisi tertentu menyelesaikan inisialisasi. Ini juga akan dipanggil ketika konsumen partisi internal baru dibuat untuk mengambil alih proses penerimaan untuk konsumen partisi internal yang gagal dan tertutup. Panggilan balik mengambil satu parameter: partition_context yang berisi informasi partisi. Panggilan balik harus didefinisikan seperti: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
Fungsi panggilan balik yang akan dipanggil setelah konsumen untuk partisi tertentu ditutup. Ini juga akan dipanggil ketika kesalahan dimunculkan selama penerimaan setelah upaya coba lagi habis. Panggilan balik mengambil dua parameter: partition_context yang berisi informasi partisi dan alasan penutupan. Panggilan balik harus didefinisikan seperti: on_partition_close(partition_context, alasan). Silakan merujuk ke CloseReason untuk berbagai alasan penutupan.
Tipe hasil
Contoh
Menerima peristiwa dari EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive(on_event=on_event)
receive_batch
Menerima peristiwa dari partisi, dengan penyeimbangan beban opsional dan titik pemeriksaan.
receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None
Parameter
- on_event_batch
- callable[PartitionContext, list[EventData]]
Fungsi panggilan balik untuk menangani batch peristiwa yang diterima. Panggilan balik mengambil dua parameter: partition_context yang berisi konteks partisi dan event_batch, yang merupakan peristiwa yang diterima. Fungsi panggilan balik harus didefinisikan seperti: on_event_batch(partition_context, event_batch). event_batch bisa menjadi daftar kosong jika max_wait_time bukan Tidak Ada atau 0 dan tidak ada peristiwa yang diterima setelah max_wait_time. Untuk informasi konteks partisi terperinci, silakan lihat PartitionContext.
- max_batch_size
- int
Jumlah maksimum peristiwa dalam batch yang diteruskan ke panggilan balik on_event_batch. Jika jumlah peristiwa yang diterima aktual lebih besar dari max_batch_size, peristiwa yang diterima dibagi menjadi batch dan memanggil panggilan balik untuk setiap batch hingga max_batch_size peristiwa.
- max_wait_time
- float
Interval maksimum dalam detik yang akan ditunda oleh prosesor peristiwa sebelum memanggil panggilan balik. Jika tidak ada peristiwa yang diterima dalam interval ini, panggilan balik on_event_batch akan dipanggil dengan daftar kosong.
- partition_id
- str
Jika ditentukan, klien hanya akan menerima dari partisi ini. Jika tidak, klien akan menerima dari semua partisi.
- owner_level
- int
Prioritas untuk konsumen eksklusif. Konsumen eksklusif akan dibuat jika owner_level ditetapkan. Konsumen dengan owner_level yang lebih tinggi memiliki prioritas eksklusif yang lebih tinggi. Tingkat pemilik juga dikenal sebagai 'nilai epoch' konsumen.
- prefetch
- int
Jumlah peristiwa yang diambil sebelumnya dari layanan untuk diproses. Defaultnya adalah 300.
- track_last_enqueued_event_properties
- bool
Menunjukkan apakah konsumen harus meminta informasi tentang peristiwa antrean terakhir pada partisi terkait, dan melacak informasi tersebut saat peristiwa diterima. Ketika informasi tentang peristiwa antrean terakhir partisi sedang dilacak, setiap peristiwa yang diterima dari layanan Azure Event Hubs akan membawa metadata tentang partisi. Ini menghasilkan sejumlah kecil konsumsi bandwidth jaringan tambahan yang umumnya merupakan trade-off yang menguntungkan ketika dipertimbangkan untuk secara berkala membuat permintaan untuk properti partisi menggunakan klien Event Hub. Ini diatur ke False secara default.
Mulai menerima dari posisi kejadian ini jika tidak ada data titik pemeriksaan untuk partisi. Data titik pemeriksaan akan digunakan jika tersedia. Ini bisa menjadi dict dengan ID partisi sebagai kunci dan posisi sebagai nilai untuk partisi individual, atau nilai tunggal untuk semua partisi. Jenis nilai dapat berupa str, int, atau datetime.datetime. Juga didukung adalah nilai "-1" untuk menerima dari awal aliran, dan "@latest" untuk hanya menerima peristiwa baru. Nilai defaultnya adalah "@latest".
Tentukan apakah starting_position yang diberikan inklusif(>=) atau tidak (>). Berlaku untuk inklusif dan False untuk eksklusif. Ini dapat menjadi dict dengan ID partisi sebagai kunci dan bool sebagai nilai yang menunjukkan apakah starting_position untuk partisi tertentu inklusif atau tidak. Ini juga bisa menjadi nilai bool tunggal untuk semua starting_position. Nilai defaultnya adalah False.
- on_error
- callable[[PartitionContext, Exception]]
Fungsi panggilan balik yang akan dipanggil ketika kesalahan dinaikkan selama penerimaan setelah upaya coba lagi habis, atau selama proses penyeimbangan beban. Panggilan balik mengambil dua parameter: partition_context yang berisi informasi partisi dan kesalahan menjadi pengecualian. partition_context bisa tidak ada jika kesalahan dimunculkan selama proses penyeimbangan beban. Panggilan balik harus didefinisikan seperti: on_error(partition_context, kesalahan). Panggilan balik on_error juga akan dipanggil jika pengecualian yang tidak tertangani dinaikkan selama panggilan balik on_event .
- on_partition_initialize
- callable[[PartitionContext]]
Fungsi panggilan balik yang akan dipanggil setelah konsumen untuk partisi tertentu menyelesaikan inisialisasi. Ini juga akan dipanggil ketika konsumen partisi internal baru dibuat untuk mengambil alih proses penerimaan untuk konsumen partisi internal yang gagal dan tertutup. Panggilan balik mengambil satu parameter: partition_context yang berisi informasi partisi. Panggilan balik harus didefinisikan seperti: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
Fungsi panggilan balik yang akan dipanggil setelah konsumen untuk partisi tertentu ditutup. Ini juga akan dipanggil ketika kesalahan dimunculkan selama penerimaan setelah upaya coba lagi habis. Panggilan balik mengambil dua parameter: partition_context yang berisi informasi partisi dan alasan penutupan. Panggilan balik harus didefinisikan seperti: on_partition_close(partition_context, alasan). Silakan merujuk ke CloseReason untuk berbagai alasan penutupan.
Tipe hasil
Contoh
Menerima peristiwa dalam batch dari EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received events from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive_batch(on_event_batch=on_event_batch)
Azure SDK for Python