How to bind Azure Function service bus trigger and topic output using Python V2 model?

Zheng, Yu 0 Reputation points
2025-02-27T18:31:48.4233333+00:00

Hi,

I am trying a simple scenario in which an Azure Function consumes a Service Bus message from a session enabled topic subscription, does a basic transformation of the message, and outputs the transformed message to another session enabled topic subscription. The code snippet below shows what I have at the moment.

Though I can output to a non-session topic sub successfully, I am not understanding how I can define the function to output a session topic message. Do I type output to func.Out[func.ServiceBusMessage] or something else? I have tried different variations, and none seemed to have helped.

Appreciate any help.

@app.function_name(name='service_bus_topic')
@app.service_bus_topic_trigger(arg_name="input",
                               topic_name="topic-1",
                               is_sessions_enabled=True, 
                               subscription_name="sub-z", 
                               connection="AzureWebJobsServiceBus")
@app.service_bus_topic_output(arg_name="output",
                              topic_name="topic-output-1",
                              connection="AzureWebJobsServiceBus")
def _service_bus_topic(input: func.ServiceBusMessage, output: func.Out[str]):
    msg = input.get_body().decode('utf-8')
    logging.info(f'topic trigger processed a message: {msg}')

    output_sb_message = ServiceBusMessage(body=f'original msg = {msg} |||| session id ?= {input.session_id}', 
                                        message_id=input.message_id, 
                                        session_id=input.session_id,
                                        application_properties={}, 
                                        user_properties={})

    ####### When output parameter is typed as func.Out[func.ServiceBusMessage] ########
    # logging.info(output_sb_message)
    # output_msg = output_sb_message
    # output.set(output_msg)

    # ERROR: Exception: FunctionLoadError: cannot load the service_bus_topic function: 
    # 'output' binding type "serviceBus" and dataType "None" in function.json do not match the corresponding function parameter's Python type annotation "ServiceBusMessage"

    output_msg = f'msg output:: {msg}'
    output.set(output_msg)
    logging.info(f'topic output message --- the end....')
Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
5,541 questions
{count} votes

1 answer

Sort by: Most helpful
  1. RithwikBojja 950 Reputation points Microsoft External Staff
    2025-03-07T05:29:15.52+00:00

    Hi @ Zheng, Yu,

    In ServiceBus Trigger V2 model of Python Function App, you cannot send session id, customer id's or broker properties as outbound, you can only hust send a message, but as an alternative you can use SDK to send to topic as below:

    
        import azure.functions as func
    
        import logging
    
        from azure.servicebus import ServiceBusClient, ServiceBusMessage
    
        import os
    
        
    
        riapp = func.FunctionApp()
    
        
    
        @riapp.service_bus_topic_trigger(
    
            arg_name="azservicebus",
    
            subscription_name="rithsub1",
    
            topic_name="mysbtopic",
    
            connection="demoser898_SERVICEBUS",
    
            is_sessions_enabled=True 
    
        )
    
        def servicebus_topic_trigger(azservicebus: func.ServiceBusMessage):
    
            msg = azservicebus.get_body().decode("utf-8")
    
            logging.info(f"Hello Rithwik, ServiceBus Topic trigger processed a message: {msg}")
    
            cho_con = os.getenv("demoser898_SERVICEBUS")
    
            sen_topic = "tp1"  
    
            servicebus_client = ServiceBusClient.from_connection_string(cho_con)
    
            cho_sen = servicebus_client.get_topic_sender(sen_topic)
    
            rith_msg = ServiceBusMessage("Rithwik Bojja")
    
            rith_msg.session_id = "008"  
    
            with cho_sen:
    
                cho_sen.send_messages(rith_msg)
    
            logging.info(f"Rithwik, Forwarded message to topic: {sen_topic}.")
    
    

    reuqirements.txt:

    
    azure-functions
    
    azure-servicebus
    
    

    local.settings.json:

    
    {
    
          "IsEncrypted": false,
    
          "Values": {
    
            "AzureWebJobsStorage": "",
    
            "FUNCTIONS_WORKER_RUNTIME": "python",
    
            "demoser898_SERVICEBUS": "Endpoint=sb://demoser898.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=1zrithwikhhk=",
    
            "AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
    
        
    
          }
    
        }
    
    

    Output:

    enter image description here

    enter image description here

    Hope this helps.

    If the answer is helpful, please click Accept Answer and kindly upvote it. If you have any further questions about this answer, please click Comment.

    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.