Bagikan melalui


ServiceBusReceiver Kelas

Kelas ServiceBusReceiver menentukan antarmuka tingkat tinggi untuk menerima pesan dari Antrean Azure Service Bus atau Langganan Topik.

Dua saluran utama untuk tanda terima pesan adalah terima() untuk membuat satu permintaan pesan, dan asinkron untuk pesan di penerima: untuk terus menerima pesan masuk dengan cara yang sedang berlangsung.

Silakan gunakan get_<queue/subscription>_receiver metode ~azure.servicebus.aio.ServiceBusClient untuk membuat instans ServiceBusReceiver.

Warisan
ServiceBusReceiver
azure.servicebus.aio._base_handler_async.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

Konstruktor

ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

Parameter

fully_qualified_namespace
str
Diperlukan

Nama host yang sepenuhnya memenuhi syarat untuk namespace Bus Layanan. Format namespace adalah: .servicebus.windows.net.

credential
AsyncTokenCredential atau AzureSasCredential atau AzureNamedKeyCredential
Diperlukan

Objek kredensial yang digunakan untuk autentikasi yang mengimplementasikan antarmuka tertentu untuk mendapatkan token. Ini menerima objek kredensial yang dihasilkan oleh pustaka identitas azure dan objek yang mengimplementasikan metode *get_token (mandiri, cakupan), atau sebagai alternatif, AzureSasCredential juga dapat disediakan.

queue_name
str

Jalur Antrean Bus Layanan tertentu yang disambungkan klien.

topic_name
str

Jalur Topik Bus Layanan tertentu yang berisi Langganan yang disambungkan klien.

subscription_name
str

Jalur Langganan Bus Layanan tertentu di bawah Topik yang ditentukan yang disambungkan klien.

receive_mode
Union[ServiceBusReceiveMode, str]

Mode tempat pesan akan diambil dari entitas. Dua opsi tersebut PEEK_LOCK dan RECEIVE_AND_DELETE. Pesan yang diterima dengan PEEK_LOCK harus diselesaikan dalam periode kunci tertentu sebelum dihapus dari antrean. Pesan yang diterima dengan RECEIVE_AND_DELETE akan segera dihapus dari antrean, dan kemudian tidak dapat ditinggalkan atau diterima kembali jika klien gagal memproses pesan. Mode default adalah PEEK_LOCK.

max_wait_time
Optional[float]

Batas waktu dalam detik antara pesan yang diterima setelah itu penerima akan secara otomatis berhenti menerima. Nilai defaultnya adalah Tidak Ada, yang berarti tidak ada batas waktu.

logging_enable
bool

Apakah akan mengeluarkan log jejak jaringan ke pencatat. Defaultnya adalah False.

transport_type
TransportType

Jenis protokol transportasi yang akan digunakan untuk berkomunikasi dengan layanan Bus Layanan. Defaultnya adalah TransportType.Amqp.

http_proxy
Dict

Pengaturan proksi HTTP. Ini harus berupa kamus dengan kunci berikut: 'proxy_hostname' (nilai str) dan 'proxy_port' (nilai int). Selain itu, kunci berikut mungkin juga ada: 'nama pengguna', 'kata sandi'.

user_agent
str

Jika ditentukan, ini akan ditambahkan di depan string agen pengguna bawaan.

auto_lock_renewer
Optional[AutoLockRenewer]

~azure.servicebus.aio.AutoLockRenewer dapat disediakan sehingga pesan secara otomatis terdaftar pada tanda terima. Jika penerima adalah penerima sesi, itu akan berlaku untuk sesi sebagai gantinya.

prefetch_count
int

Jumlah maksimum pesan untuk di-cache dengan setiap permintaan ke layanan. Pengaturan ini hanya untuk penyetelan performa tingkat lanjut. Meningkatkan nilai ini akan meningkatkan performa throughput pesan tetapi meningkatkan kemungkinan pesan akan kedaluwarsa saat di-cache jika tidak diproses dengan cukup cepat. Nilai default adalah 0, yang berarti pesan akan diterima dari layanan dan diproses satu per satu. Dalam kasus prefetch_count menjadi 0, ServiceBusReceiver.receive akan mencoba melakukan cache max_message_count (jika disediakan) dalam permintaannya ke layanan.

client_identifier
str

Pengidentifikasi berbasis string untuk mengidentifikasi instans klien secara unik. Azure Service Bus akan mengaitkannya dengan beberapa pesan kesalahan untuk korelasi kesalahan yang lebih mudah. Jika tidak ditentukan, id unik akan dibuat.

socket_timeout
float

Waktu dalam detik bahwa soket yang mendasar pada koneksi harus menunggu saat mengirim dan menerima data sebelum waktu habis. Nilai defaultnya adalah 0,2 untuk TransportType.Amqp dan 1 untuk TransportType.AmqpOverWebsocket. Jika kesalahan koneksi terjadi karena waktu tulis habis, nilai yang lebih besar dari default mungkin perlu diteruskan.

Variabel

fully_qualified_namespace
str

Nama host yang sepenuhnya memenuhi syarat untuk namespace Bus Layanan. Format namespace adalah: .servicebus.windows.net.

entity_path
str

Jalur entitas yang disambungkan klien.

Metode

abandon_message

Abaikan pesan.

Pesan ini akan dikembalikan ke antrean dan tersedia untuk diterima lagi.

close
complete_message

Selesaikan pesan.

Ini akan menghapus pesan dari antrean.

dead_letter_message

Pindahkan pesan ke antrean Surat Mati.

Antrean Dead Letter adalah sub-antrean yang dapat digunakan untuk menyimpan pesan yang gagal diproses dengan benar, atau memerlukan inspeksi atau pemrosesan lebih lanjut. Antrean juga dapat dikonfigurasi untuk mengirim pesan kedaluwarsa ke antrean Dead Letter.

defer_message

Menangguhkan pesan.

Pesan ini akan tetap dalam antrean tetapi harus diminta secara khusus dengan nomor urutnya agar dapat diterima.

peek_messages

Telusuri pesan yang saat ini tertunda dalam antrean.

Pesan yang diintip tidak dihapus dari antrean, juga tidak dikunci. Mereka tidak dapat diselesaikan, ditangguhkan, atau dihentikan suratnya.

receive_deferred_messages

Menerima pesan yang sebelumnya telah ditangguhkan.

Saat menerima pesan yang ditangguhkan dari entitas yang dipartisi, semua nomor urutan yang disediakan harus berupa pesan dari partisi yang sama.

receive_messages

Menerima batch pesan sekaligus.

Pendekatan ini optimal jika Anda ingin memproses beberapa pesan secara bersamaan, atau melakukan penerimaan ad-hoc sebagai satu panggilan.

Perhatikan bahwa jumlah pesan yang diambil dalam satu batch akan bergantung pada apakah prefetch_count diatur untuk penerima. Jika prefetch_count tidak diatur untuk penerima, penerima akan mencoba menyimpan pesan max_message_count (jika disediakan) dalam permintaan ke layanan.

Panggilan ini akan memprioritaskan pengembalian dengan cepat daripada memenuhi ukuran batch yang ditentukan, sehingga akan kembali segera setelah setidaknya satu pesan diterima dan ada celah dalam pesan masuk terlepas dari ukuran batch yang ditentukan.

renew_message_lock

Perbarui kunci pesan.

Ini akan mempertahankan kunci pada pesan untuk memastikan tidak dikembalikan ke antrean untuk diolah ulang.

Untuk menyelesaikan (atau menyelesaikan) pesan, kunci harus dipertahankan, dan belum dapat kedaluwarsa; kunci kedaluwarsa tidak dapat diperpanjang.

Pesan yang diterima melalui mode RECEIVE_AND_DELETE tidak dikunci, dan oleh karena itu tidak dapat diperbarui. Operasi ini juga hanya tersedia untuk pesan yang tidak sesi.

abandon_message

Abaikan pesan.

Pesan ini akan dikembalikan ke antrean dan tersedia untuk diterima lagi.

async abandon_message(message: ServiceBusReceivedMessage) -> None

Parameter

message
ServiceBusReceivedMessage
Diperlukan

Pesan yang diterima untuk ditinggalkan.

Tipe hasil

Pengecualian

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Contoh

Abaikan pesan yang diterima.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.abandon_message(message)

close

async close() -> None

Pengecualian

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

Selesaikan pesan.

Ini akan menghapus pesan dari antrean.

async complete_message(message: ServiceBusReceivedMessage) -> None

Parameter

message
ServiceBusReceivedMessage
Diperlukan

Pesan yang diterima untuk diselesaikan.

Tipe hasil

Pengecualian

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Contoh

Selesaikan pesan yang diterima.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.complete_message(message)

dead_letter_message

Pindahkan pesan ke antrean Surat Mati.

Antrean Dead Letter adalah sub-antrean yang dapat digunakan untuk menyimpan pesan yang gagal diproses dengan benar, atau memerlukan inspeksi atau pemrosesan lebih lanjut. Antrean juga dapat dikonfigurasi untuk mengirim pesan kedaluwarsa ke antrean Dead Letter.

async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

Parameter

message
ServiceBusReceivedMessage
Diperlukan

Pesan yang diterima akan di-dead-letter.

reason
Optional[str]
nilai default: None

Alasan untuk surat gagal pesan.

error_description
Optional[str]
nilai default: None

Deskripsi kesalahan terperinci untuk dead-lettering pesan.

Tipe hasil

Pengecualian

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Contoh

Surat mati pesan yang diterima.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.dead_letter_message(message)

defer_message

Menangguhkan pesan.

Pesan ini akan tetap dalam antrean tetapi harus diminta secara khusus dengan nomor urutnya agar dapat diterima.

async defer_message(message: ServiceBusReceivedMessage) -> None

Parameter

message
ServiceBusReceivedMessage
Diperlukan

Pesan yang diterima untuk ditangguhkan.

Tipe hasil

Pengecualian

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Contoh

Tangguhkan pesan yang diterima.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.defer_message(message)

peek_messages

Telusuri pesan yang saat ini tertunda dalam antrean.

Pesan yang diintip tidak dihapus dari antrean, juga tidak dikunci. Mereka tidak dapat diselesaikan, ditangguhkan, atau dihentikan suratnya.

async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parameter

max_message_count
int
nilai default: 1

Jumlah maksimum pesan yang akan dicoba dan diintip. Nilai default adalah 1.

sequence_number
int

Nomor urutan pesan untuk mulai menelusuri pesan.

timeout
Optional[float]

Total batas waktu operasi dalam detik termasuk semua percobaan ulang. Nilai harus lebih besar dari 0 jika ditentukan. Nilai defaultnya adalah Tidak Ada, yang berarti tidak ada batas waktu.

Mengembalikan

Daftar objek ~azure.servicebus.ServiceBusReceivedMessage.

Tipe hasil

Pengecualian

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Contoh

Mengintip pesan dalam antrean.


   async with servicebus_receiver:
       messages = await servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

Menerima pesan yang sebelumnya telah ditangguhkan.

Saat menerima pesan yang ditangguhkan dari entitas yang dipartisi, semua nomor urutan yang disediakan harus berupa pesan dari partisi yang sama.

async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parameter

sequence_numbers
Union[int, list[int]]
Diperlukan

Daftar nomor urut pesan yang telah ditangguhkan.

timeout
Optional[float]

Total batas waktu operasi dalam detik termasuk semua percobaan ulang. Nilai harus lebih besar dari 0 jika ditentukan. Nilai defaultnya adalah Tidak Ada, yang berarti tidak ada batas waktu.

Mengembalikan

Daftar pesan yang diterima.

Tipe hasil

list[<xref:azure.servicebus.aio.ServiceBusReceivedMessage>]

Pengecualian

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Contoh

Menerima pesan yang ditangguhkan dari ServiceBus.


   async with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           await servicebus_receiver.defer_message(message)

       received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for message in received_deferred_msg:
           await servicebus_receiver.complete_message(message)

receive_messages

Menerima batch pesan sekaligus.

Pendekatan ini optimal jika Anda ingin memproses beberapa pesan secara bersamaan, atau melakukan penerimaan ad-hoc sebagai satu panggilan.

Perhatikan bahwa jumlah pesan yang diambil dalam satu batch akan bergantung pada apakah prefetch_count diatur untuk penerima. Jika prefetch_count tidak diatur untuk penerima, penerima akan mencoba menyimpan pesan max_message_count (jika disediakan) dalam permintaan ke layanan.

Panggilan ini akan memprioritaskan pengembalian dengan cepat daripada memenuhi ukuran batch yang ditentukan, sehingga akan kembali segera setelah setidaknya satu pesan diterima dan ada celah dalam pesan masuk terlepas dari ukuran batch yang ditentukan.

async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

Parameter

max_message_count
Optional[int]
nilai default: 1

Jumlah maksimum pesan dalam batch. Jumlah aktual yang dikembalikan akan bergantung pada ukuran prefetch_count dan laju aliran masuk. Pengaturan ke Tidak Ada akan sepenuhnya bergantung pada konfigurasi prefetch. Nilai defaultnya adalah 1.

max_wait_time
Optional[float]
nilai default: None

Waktu maksimum untuk menunggu dalam detik agar pesan pertama tiba. Jika tidak ada pesan yang tiba, dan tidak ada batas waktu yang ditentukan, panggilan ini tidak akan kembali sampai koneksi ditutup. Jika ditentukan, dan tidak ada pesan yang tiba dalam periode waktu habis, daftar kosong akan dikembalikan.

Mengembalikan

Daftar pesan yang diterima. Jika tidak ada pesan yang tersedia, ini akan menjadi daftar kosong.

Tipe hasil

list[<xref:azure.servicebus.aio.ServiceBusReceivedMessage>]

Pengecualian

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Contoh

Menerima pesan dari ServiceBus.


   async with servicebus_receiver:
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           await servicebus_receiver.complete_message(message)

renew_message_lock

Perbarui kunci pesan.

Ini akan mempertahankan kunci pada pesan untuk memastikan tidak dikembalikan ke antrean untuk diolah ulang.

Untuk menyelesaikan (atau menyelesaikan) pesan, kunci harus dipertahankan, dan belum dapat kedaluwarsa; kunci kedaluwarsa tidak dapat diperpanjang.

Pesan yang diterima melalui mode RECEIVE_AND_DELETE tidak dikunci, dan oleh karena itu tidak dapat diperbarui. Operasi ini juga hanya tersedia untuk pesan yang tidak sesi.

async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

Parameter

message
ServiceBusReceivedMessage
Diperlukan

Pesan untuk memperbarui kunci.

timeout
Optional[float]

Total batas waktu operasi dalam detik termasuk semua percobaan ulang. Nilai harus lebih besar dari 0 jika ditentukan. Nilai defaultnya adalah Tidak Ada, yang berarti tidak ada batas waktu.

Mengembalikan

Tanggalwaktu utc kunci diatur untuk kedaluwarsa pada.

Tipe hasil

Pengecualian

TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

Contoh

Perbarui kunci pada pesan yang diterima.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.renew_message_lock(message)

Atribut

client_identifier

Dapatkan pengidentifikasi klien ServiceBusReceiver yang terkait dengan instans penerima.

Tipe hasil

str

session

Dapatkan objek ServiceBusSession yang ditautkan dengan penerima. Sesi hanya tersedia untuk entitas yang mendukung sesi, sesi akan mengembalikan Tidak Ada jika dipanggil pada penerima non-sesi.

Tipe hasil

Contoh

Mendapatkan sesi dari penerima


       async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session