.NET - Azure Function - Azure service bus repeat call
I know this may be a trivial question, but I'm not sure what to do. I am using Azure Service Bus together with MassTransit.
I have two services. Service1 is a WebApi application, and Service2 is a couple of Azure Functions units that respond to service bus topics.
In Service1 I have a consumer for the TestMessage topic. If I notify the topic, it passes to the consumer fine. But when an exception occurs inside the consumer it is immediately notified by the Fault consumer.
Config:
services.AddMassTransit(o =>
{
o.AddConsumers(Assembly.GetExecutingAssembly());
o.UsingAzureServiceBus((context, cfg) =>
{
var connectionStrings = context.GetRequiredService<IOptions<ConnectionStringOption>>();
cfg.Host(new HostSettings
{
ServiceUri = new Uri("sb://" + connectionStrings.Value.Messaging),
TokenCredential = new DefaultAzureCredential()
});
cfg.SubscriptionEndpoint<TestMessage>(formatter.Consumer<TestMessageConsumer>(), e =>
{
e.ConfigureConsumer<TestMessageConsumer>(context);
});
cfg.SubscriptionEndpoint<Fault<TestMessage>>(formatter.Consumer<TestMessageFaultConsumer>(), e =>
{
e.ConfigureConsumer<TestMessageFaultConsumer>(context);
});
});
});
```**Consumer:**
public class TestMessageConsumer : IConsumer<TestMessage>
{
public Task Consume(ConsumeContext<TestMessage> context)
{
throw new Exception();
return Task.CompletedTask;
}
}
public class TestMessageFaultConsumer : IConsumer<Fault<TestMessage>>
{
public Task Consume(ConsumeContext<Fault<TestMessage>> context)
{
return Task.CompletedTask;
}
}
**Config in service1:**
if (!await adminClient.TopicExistsAsync(formatter.MessageName<TestMessage>()))
await adminClient.CreateTopicAsync(new CreateTopicOptions(formatter.MessageName<TestMessage>()));
if (!await adminClient.SubscriptionExistsAsync(formatter.MessageName<TestMessage>(), formatterCheckFunc.ConsumerFromMessageName<TestMessageConsumer>()))
await adminClient.CreateSubscriptionAsync(new CreateSubscriptionOptions(formatter.MessageName<TestMessage>(), formatterCheckFunc.ConsumerFromMessageName<TestMessageConsumer>()));
services.AddMassTransitForAzureFunctions(cfg =>
{
cfg.AddConsumer<TestMessageConsumer>();
}, "ServiceBusConnection", (context, configuration) =>
{
var connectionStrings = context.GetRequiredService<IOptions<ConnectionStringOption>>();
var settings = new HostSettings
{
ServiceUri = new Uri("sb://" + connectionStrings.Value.Messaging),
TokenCredential = new DefaultAzureCredential(),
};
configuration.Host(settings);
});
public Function(IMessageReceiver receiver)
{
_receiver = receiver;
}
[Microsoft.Azure.Functions.Worker.Function(nameof(Function))]
public async Task Run([Microsoft.Azure.Functions.Worker.ServiceBusTrigger(TopicName, SubscriptionName, Connection = Connection)] ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions, CancellationToken cancellationToken)
{
await _receiver.HandleConsumer<TestMessageConsumer>(TopicName, SubscriptionName, message, cancellationToken);
}
Thanks a lot