Spring Cloud Stream dengan Azure Bus Layanan

Artikel ini menunjukkan cara menggunakan Spring Cloud Stream Binder untuk mengirim pesan ke dan menerima pesan dari Bus Layanan queues dan topics.

Azure menyediakan platform perpesanan asinkron yang disebut Azure Bus Layanan ("Bus Layanan") yang didasarkan pada standar Advanced Message Queueing Protocol 1.0 ("AMQP 1.0"). Azure Service Bus dapat digunakan di berbagai platform Azure yang didukung.

Prasyarat

  • Langganan Azure - membuat secara gratis.

  • Java Development Kit (JDK) versi 8 atau yang lebih tinggi.

  • Apache Maven, versi 3.2 atau lebih tinggi.

  • cURL atau utilitas HTTP serupa untuk menguji fungsionalitas.

  • Antrean atau topik untuk Azure Bus Layanan. Jika Anda tidak memilikinya, buat antrean Bus Layanan atau buat topik Bus Layanan.

  • Aplikasi Spring Boot. Jika Anda tidak memilikinya, buat proyek Maven dengan Spring Initializr. Pastikan untuk memilih Proyek Maven dan, di bawah Dependensi, tambahkan dependensi Spring Web dan Dukungan Azure, lalu pilih Java versi 8 atau yang lebih tinggi.

Catatan

Untuk memberikan akses akun ke sumber daya Azure Bus Layanan Anda, tetapkan Azure Service Bus Data Sender peran dan Azure Service Bus Data Receiver ke akun Microsoft Entra yang saat ini Anda gunakan. Untuk informasi selengkapnya tentang memberikan peran akses, lihat Menetapkan peran Azure menggunakan portal Azure dan Mengautentikasi dan mengotorisasi aplikasi dengan ID Microsoft Entra untuk mengakses entitas Azure Bus Layanan.

Penting

Spring Boot versi 2.5 atau yang lebih tinggi diperlukan untuk menyelesaikan langkah-langkah dalam artikel ini.

Mengirim dan menerima pesan dari Azure Bus Layanan

Dengan antrean atau topik untuk Azure Bus Layanan, Anda dapat mengirim dan menerima pesan menggunakan Bus Layanan Spring Cloud Azure Stream Binder.

Untuk menginstal modul Bus Layanan Spring Cloud Azure Stream Binder, tambahkan dependensi berikut ke file pom.xml Anda:

  • Spring Cloud Azure Bill of Materials (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.12.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Catatan

    Jika Anda menggunakan Spring Boot 2.x, pastikan untuk mengatur versi ke spring-cloud-azure-dependencies4.18.0. Bill of Material (BOM) ini harus dikonfigurasi di bagian <dependencyManagement> file pom.xml Anda. Ini memastikan bahwa semua dependensi Spring Cloud Azure menggunakan versi yang sama. Untuk informasi selengkapnya tentang versi yang digunakan untuk BOM ini, lihat Versi Spring Cloud Azure mana yang Harus Saya Gunakan.

  • Artefak Bus Layanan Spring Cloud Azure Stream Binder:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Kodekan aplikasi

Gunakan langkah-langkah berikut untuk mengonfigurasi aplikasi Anda untuk menggunakan antrean atau topik Bus Layanan untuk mengirim dan menerima pesan.

  1. Konfigurasikan kredensial Bus Layanan dalam file application.propertieskonfigurasi .

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    Tabel berikut ini menjelaskan bidang dalam konfigurasi:

    Bidang Deskripsi
    spring.cloud.azure.servicebus.namespace Tentukan namespace yang Anda peroleh di Bus Layanan Anda dari portal Azure.
    spring.cloud.stream.bindings.consume-in-0.destination Tentukan Bus Layanan antrian atau topik Bus Layanan yang Anda gunakan dalam tutorial ini.
    spring.cloud.stream.bindings.supply-out-0.destination Tentukan nilai yang sama yang digunakan untuk tujuan input.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Tentukan apakah akan menyelesaikan pesan secara otomatis. Jika diatur sebagai false, header Checkpointer pesan akan ditambahkan untuk memungkinkan pengembang menyelesaikan pesan secara manual.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Tentukan jenis entitas untuk pengikatan output, bisa atau queuetopic.
    spring.cloud.function.definition Tentukan bean fungsional mana untuk diikatkan ke tujuan eksternal yang diekspos oleh pengikatan.
    spring.cloud.stream.poller.fixed-delay Tentukan penundaan tetap untuk poller default dalam milidetik. Nilai defaultnya adalah 1000 L. Nilai yang disarankan adalah 60000.
    spring.cloud.stream.poller.initial-delay Tentukan penundaan awal untuk pemicu berkala. Nilai default adalah 0.
  2. Edit file kelas startup untuk menampilkan konten berikut.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Tip

    Dalam tutorial ini, tidak ada operasi autentikasi dalam konfigurasi atau kode. Namun, menyambungkan ke layanan Azure memerlukan autentikasi. Untuk menyelesaikan autentikasi, Anda perlu menggunakan Azure Identity. Spring Cloud Azure menggunakan DefaultAzureCredential, yang disediakan pustaka Azure Identity untuk membantu Anda mendapatkan kredensial tanpa perubahan kode apa pun.

    DefaultAzureCredential mendukung beberapa metode autentikasi dan menentukan metode mana yang akan digunakan saat runtime. Pendekatan ini memungkinkan aplikasi Anda menggunakan metode autentikasi yang berbeda di lingkungan yang berbeda (seperti lingkungan lokal dan produksi) tanpa menerapkan kode khusus lingkungan. Untuk informasi selengkapnya, lihat DefaultAzureCredential.

    Untuk menyelesaikan autentikasi di lingkungan pengembangan lokal, Anda dapat menggunakan Azure CLI, Visual Studio Code, PowerShell, atau metode lainnya. Untuk informasi selengkapnya, lihat Autentikasi Azure di lingkungan pengembangan Java. Untuk menyelesaikan autentikasi di lingkungan hosting Azure, sebaiknya gunakan identitas terkelola yang ditetapkan pengguna. Untuk informasi selengkapnya, lihat Apa yang dimaksud dengan identitas terkelola untuk sumber daya Azure?

  3. Mulai aplikasi. Pesan seperti contoh berikut akan diposting di log aplikasi Anda:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Langkah berikutnya