Control Azure Service Bus Consumer to start or stop listening from the Topic in spring boot

Kush Patel 0 Reputation points
2023-01-30T06:49:54.0633333+00:00

What I want to Achieve - Control Service Bus Consumer to start / stop receiving messages from queue/topic.

Below is a detailed explanation.

Currently I have integrated Azure Service Bus in my application and we listen message as soon as spring boot application starts. Now I want to modify this logic. On ApplicationReadyEvent event I want to disable ServiceBusConsumer to start listening, then I want to perform some task and after that again I want to enable ServiceBusConsumer to start listening from topic / queue.

So how can I achieve that ?

application.yml

spring:
  cloud:
    azure:
      servicebus:
        namespace: **********
xxx:
  azure:
    servicebus:
      connection: ***********
      queue: **********

AzureConfiguration.java

import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;


@Configuration
public class AzureConfiguration{

    @Value("${xxx.azure.servicebus.connection}")
    private String serviceBusConnection;

    @Value("${xxx.azure.servicebus.queue}")
    private String serviceBusQueue;

    private static final String SERVICE_BUS_INPUT_CHANNEL = "yyyyy";
    private static final String SENSOR_DATA_CHANNEL = "zzzzz";
    private static final String SERVICE_BUS_LISTENER_CONTAINER = "aaaaa";


    @Bean(name = SERVICE_BUS_LISTENER_CONTAINER)
    public ServiceBusMessageListenerContainer serviceBusMessageListenerContainer(ServiceBusProcessorFactory processorFactory) {

        ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
        containerProperties.setConnectionString(serviceBusConnection);
        containerProperties.setEntityName(serviceBusQueue);
        containerProperties.setAutoComplete(true);
        return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
    }


    @Bean
    public ServiceBusInboundChannelAdapter serviceBusInboundChannelAdapter(
            @Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel,
            @Qualifier(SERVICE_BUS_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {

        ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
        adapter.setOutputChannel(inputChannel);
        adapter.setAutoStartup(false);
        return adapter;
    }


    @Bean(name = SERVICE_BUS_INPUT_CHANNEL)
    public MessageChannel serviceBusInputChannel() {

        return new DirectChannel();
    }


    @Bean(name = SENSOR_DATA_CHANNEL)
    public MessageChannel sensorDataChannel() {

        return new DirectChannel();
    }


    @Bean
    public IntegrationFlow serviceBusMessageFlow() {

        return IntegrationFlows.from(SERVICE_BUS_INPUT_CHANNEL)
                .<byte[], String>transform(String::new)
                .channel(SENSOR_DATA_CHANNEL)
                .get();
    }
}

AppEventListenerService.java

import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;

import java.util.List;

@Slf4j
@Service
@AllArgsConstructor
public class AppEventListenerService{

   
    @EventListener(ApplicationReadyEvent.class)
    public void OnApplicationStarted() {
        log.debug("Enter OnApplicationStarted");
        // Disable Azure Bus Message Listener
        // do some task
        // Enable Azure Bus Message Listener
        log.debug("Exit OnApplicationStarted");
    }
}

In above code in AppEventListenerService.java ,

// Disable Azure Bus Message Listener - Here I want to stop ServiceBusConsumer to receive message from topic/queue

// Enable Azure Bus Message Listener - Here I want to start ServiceBusConsumer to receive message from topic/queue.

Azure Service Bus
Azure Service Bus
An Azure service that provides cloud messaging as a service and hybrid integration.
542 questions
Azure Queue Storage
Azure Queue Storage
An Azure service that provides messaging queues in the cloud.
96 questions
Azure Spring Apps
Azure Spring Apps
An Azure platform as a service for running Spring Boot applications at cloud scale. Previously known as Azure Spring Cloud.
109 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Mike Urnun 9,666 Reputation points Microsoft Employee
    2023-02-08T05:33:55.0933333+00:00

    @Kush Patel & others visiting this post with similar use-case,

    This question, specifically with regard to being able to execute business logic ahead of consuming SB messages, has now been taken up as a feature request & added to the SDK backlog. However, there are a couple of workarounds provided, please refer to the GitHub issue opened by @Kush Patel here: https://github.com/Azure/azure-sdk-for-java/issues/33293

    0 comments No comments