本文介绍如何排查使用 Spring JMS 时的已知问题和常见错误。 本文还回答了 spring-cloud-azure-starter-servicebus-jms
的一些常见问题。
连接问题
MessageProducer 由于无法恢复的错误而关闭
问题说明
使用 JmsTemplate
发送消息时,JmsTemplate
在 10 到 15 分钟的空闲间隔内变得不可用。 以该间隔发送消息可以获取以下示例输出中显示的异常:
2022-11-06 11:12:05.762 INFO 25944 --- [ scheduling-1] c.e.demo.ServiceBusJMSMessageProducer : Sending message: 2022-11-06T11:12:05.762072 message 1
2022-11-06 11:12:05.772 ERROR 25944 --- [ scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task
org.springframework.jms.IllegalStateException: The MessageProducer was closed due to an unrecoverable error.; nested exception is javax.jms.IllegalStateException: The MessageProducer was closed due to an unrecoverable error.
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:274) ~[spring-jms-5.3.23.jar:5.3.23]
...
Caused by: org.apache.qpid.jms.provider.ProviderException: The link 'G0:36906660:qpid-jms:sender:azure:5caf3ef4-9602-413c-964d-cf1292d6e1f5:1:1:1:t4' is force detached. Code: publisher(link376). Details: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00. [condition = amqp:link:detach-forced]
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToNonFatalException(AmqpSupport.java:181) ~[qpid-jms-client-0.53.0.jar:na]
...
原因分析
当 AMQP 连接和链接处于活动状态但未调用(例如发送或接收调用)使用链接 10 分钟时,Azure 服务总线 会出现异常。 在本例中,链接已关闭。 如果连接中的所有链接都已关闭,因为没有活动(空闲),并且未在 5 分钟内创建新链接,则连接将关闭。
对于服务总线 JMS 初学者,默认使用 CachingConnectionFactory,该缓存会话、生成者和使用者。 当 JmsProducer
处于空闲状态超过 10 分钟但小于 15 分钟时,缓存的生成者占用的链接已被关闭。 在此时间间隔内无法发送消息。 然后,再空闲 5 分钟后,整个连接将关闭。 因此,15 分钟空闲间隔后的任何发送操作都会导致 CachingConnectionFactory
创建新的连接进行发送。 发送操作在 15 分钟后可用。
解决方法
目前,初学者通过应用 JmsPoolConnectionFactory
(池 Connection
、Session
和 MessageProducer
)并管理共用实例的生命周期,为链接分离问题提供了一种解决方法。 此解决方法可以确保生成者在不可用后被逐出,因此对活动生成者执行所有发送操作。
若要使用此解决方法,请添加以下配置:
spring:
jms:
servicebus:
pool:
enabled: true
max-connections: ${your-expected-max-connection-value}
spring.jms.servicebus.idle-timeout 的使用
空闲超时属性配置 AMQP 连接的 空闲超时。 AMQP 规范提供以下说明:
连接受空闲超时阈值的约束。 当超出阈值后未收到帧时,本地对等方将触发超时。 空闲超时以毫秒为单位,从收到最后一帧的时间开始。 如果超出阈值,则对等方 SHOULD 会尝试使用关闭帧正常关闭连接,并显示解释原因的错误。 如果远程对等方未在阈值内正常响应,则对等方可能会关闭 TCP 套接字。
对于 JMS 客户端,在配置此属性时,可在服务器端控制服务器在未传递消息时发送空帧以保持连接活动状态的时间。 此属性控制远程对等方的行为,每个对等方可以有自己的独立值。
JmsTemplate 问题
计划消息
Azure 服务总线支持延迟的消息处理。 有关详细信息,请参阅 消息排序和时间戳计划消息 部分。 对于 JMS,若要计划消息,请使用消息注释标头 ScheduledEnqueueTimeUtc
设置 x-opt-scheduled-enqueue-time
属性。
JmsListener 问题
即使服务器中没有消息,也会向服务总线发送过多请求
问题说明
使用 @JmsListener
API 时,在某些情况下,可以在 Azure 门户中看到发送到队列或主题的传入请求存在持续值,即使服务器中没有要接收的消息也是如此。
原因分析
@JmsListener
是一个轮询侦听器,是为重复轮询尝试而生成的。
侦听器位于正在进行的轮询循环上。 每个循环调用 JMS MessageConsumer.receive()
方法,以轮询本地使用者以获取要使用的消息。 默认情况下,对于每个轮询操作,本地使用者将拉取请求发送到消息中转站以请求消息,然后阻止一段时间。 具体轮询过程由几个属性决定,包括 receiveTimeout
、prefetchSize
和 receiveLocalOnly
或 receiveNoWaitLocalOnly
。 仅当将 receiveNoWaitLocalOnly
设置为负值时,才使用 receiveTimeout
方法。
如果应用程序遇到此问题,请检查以下配置设置:
确定预提取策略是否为 0,这也是默认选项。 0 预提取意味着拉取使用者,用于为每个轮询将拉取请求发送到服务总线。
如果已配置非零预提取,请确定
receiveLocalOnly
还是receiveNoWaitLocalOnly
属性设置为false
,这是默认选项。 此处的false
值仍会导致向服务器发送拉取请求,因为它不仅会轮询本地使用者。receiveTimeout
配置确定每个拉取请求阻止的时间,因此可能会影响发送到服务器的拉取请求的频率。 默认值为 1 秒。
有关完整分析,请参阅 GitHub 问题中的讨论。
解决 方案
以下部分介绍了两种处理此问题的解决方案
解决方案 1. 更改以仅推送使用者和本地检查
通过将模式更改为 push
,使用者将成为 异步通知 使用者,该使用者不会从中转站拉取消息,但会保留目标数量的链接信用额度。 金额由预提取属性决定。 当服务总线(发送方)推送消息时,发送方的链接信用额度会减少,当发送方的链接信用额度低于阈值时,客户端(接收方)向服务器发送请求,以将发件人的链接信用额度重新提高到所需的目标金额。
若要完成此解决方案,请添加以下配置:
首先,将 prefetch
数字配置为非零,它将使用者配置为非拉取。 下表显示了多个预提取属性,每个属性控制不同的服务总线实体。 设置适用于事例的属性。
财产 | 描述 |
---|---|
spring.jms.servicebus.prefetch.all |
此服务总线命名空间中预提取选项的回退值 |
spring.jms.servicebus.prefetch.queue-prefetch |
队列的预提取编号。 |
spring.jms.servicebus.prefetch.queue-browser-prefetch |
队列浏览器的预提取编号。 |
spring.jms.servicebus.prefetch.topic-prefetch |
主题的预提取编号。 |
spring.jms.servicebus.prefetch.durable-topic-prefetch |
持久主题的预提取编号。 |
其次,通过为工厂定制器添加配置类来配置 non-local-check
,如以下示例所示:
@Configuration(proxyBeanMethods = false)
public class CustomJmsConfiguration {
@Bean
ServiceBusJmsConnectionFactoryCustomizer customizer() {
return factory -> {
factory.setReceiveLocalOnly(true);
// Configure the below ReceiveNoWaitLocalOnly instead if you have specified the property
// spring.jms.listener.receive-timeout with negative value. Otherwise, configure the above `ReceiveLocalOnly`.
//factory.setReceiveNoWaitLocalOnly(true);
};
}
}
预提取值可能会影响将消息调度到使用者的本地缓冲区的速度。 应根据消耗的性能和消息量调整该值。 合适的值可以加速消耗过程,而太大的值可能会导致本地缓冲消息变得过时并再次调度。 对于低消息量,如果每个消息处理的时间较长,请将预提取设置为 1。 此值可确保使用者一次只处理一条消息。
解决方案 2. 增加接收超时以减少拉取频率
接收超时属性确定使用者阻止等待拉取结果的时长的策略。 因此,通过延长超时,可以降低拉取频率,然后在选择拉取模式时减少拉取请求数。 在极端情况下,可以设置在消息到达之前无限期等待的策略,这意味着使用者只有在使用消息后才会拉取。 在这种情况下,如果服务器中没有消息,它将阻止等待。
若要完成此解决方案,请配置 spring.jms.listener.receive-timeout
属性。 此属性的类型为 java.time.Duration
,默认值为 1 秒。 以下列表说明了各种值的效果:
- 将接收超时设置为 0 意味着在调度消息之前,拉取会无限期地阻止。
- 将接收超时设置为正值意味着拉取会阻止超时时间。
- 将接收超时设置为负值意味着拉取是无等待接收,这意味着它会立即返回消息,或者
null
如果没有消息可用。
注意
高超时值可能会带来一些副作用。 例如,高超时值还会延长主线程处于块状态的时间。 此状态意味着容器对 stop()
调用的响应较少,并且只能在 receive()
调用之间停止。
此外,容器只能在传递 receive-timeout
间隔后发送请求。 如果间隔超过 10 分钟,服务总线将关闭链接,并阻止侦听器发送或接收。 有关详细信息,请参阅 Azure 服务总线中
如果需要高接收超时,请确保使用 JmsPoolConnectionFactory。
有关链接关闭问题以及如何使用 JmsPoolConnectionFactory
的详细信息,请参阅 MessageProducer 因无法恢复的错误 部分而关闭。
预提取问题
问题说明
不适合的预提取策略可能会导致以下问题:
- 重复使用相同的消息。
- 消息在
MaxDeliveryCountExceeded
后放入死信队列中,即使处理消息时没有错误或异常也是如此。
原因分析
当 预提取 值高于实际消耗容量时,通常会发生此问题,其影响是将过多消息预提取到等待使用的本地缓冲区。 但是,预提取的消息在服务总线端的 速览锁定 模式下被视为调度。 每个已调度消息都有一个 最大传递计数 和锁定持续时间属性。 在速览锁接收模式下,提取到预提取缓冲区的消息将进入处于锁定状态的缓冲区中,锁计时的超时时钟。 如果预提取缓冲区很大,并且处理需要很长时间,消息锁在保留预提取缓冲区时过期,则消息被视为已放弃,并再次可用于从队列检索。
此问题可能会导致将消息提取到预提取缓冲区并放置在末尾。 如果在消息过期前未处理预提取缓冲区,则会重复预提取消息,但从未有效地以可用(有效锁定)状态传递消息。 然后,当取消排队这些过时的副本时,应用程序会重复使用相同的消息,并且无法完成它们。 在另一种情况下,重复的消息都在缓冲区中过期,然后才被使用。 在这种情况下,在超出最大传递计数后,服务总线中的消息最终将移动到死信队列。
有关详细信息,请参阅 为什么预提取不是默认选项?预提取 Azure 服务总线消息的 部分。
溶液
请注意预提取的配置,以确保它符合使用功能。 必须平衡队列或订阅上配置的最大预提取计数和锁定持续时间,使锁定超时至少超过预提取缓冲区最大大小的累积预期消息处理时间,以及一条消息。 同时,锁定超时不应太长,以至于消息在意外删除时可能超过其生存时间上限,从而要求其锁定在重新传送之前过期。
若要配置默认值为零的预提取属性,请使用以下属性之一:
财产 | 描述 |
---|---|
spring.jms.servicebus.prefetch.all |
此服务总线命名空间中预提取选项的回退值。 |
spring.jms.servicebus.prefetch.queue-prefetch |
队列的预提取编号。 |
spring.jms.servicebus.prefetch.queue-browser-prefetch |
队列浏览器的预提取编号。 |
spring.jms.servicebus.prefetch.topic-prefetch |
主题的预提取编号。 |
spring.jms.servicebus.prefetch.durable-topic-prefetch |
持久主题的预提取编号。 |
如何对服务总线执行 AMQP 处置?
在向消息代理确认消息时,JMS 支持五种 AMQP 处置类型。 支持的值为 ACCEPTED
、REJECTED
、RELEASED
、MODIFIED_FAILED
和 MODIFIED_FAILED_UNDELIVERABLE
。 有关详细信息,请参阅 AMQP 处置和服务总线操作映射 部分 将 Java 消息服务 1.1 与 Azure 服务总线标准和 AMQP 1.0配合使用。
因此,若要使用 JmsListener
手动完成、放弃、死信、延迟或释放消息,请执行以下步骤:
禁用会话事务处理并使用 CLIENT 确认模式。
若要完成此任务,请声明自己的 JmsListenerContainerFactory bean,然后设置属性,或发布在
JmsListenerContainerFactory
中定义的 。 以下示例使用声明另一个豆的方法:@Configuration(proxyBeanMethods = false) public class CustomJmsConfiguration { @Bean public JmsListenerContainerFactory<?> customQueueJmsListenerContainerFactory( DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory jmsListenerContainerFactory = new DefaultJmsListenerContainerFactory(); configurer.configure(jmsListenerContainerFactory, connectionFactory); jmsListenerContainerFactory.setPubSubDomain(Boolean.FALSE); jmsListenerContainerFactory.setSessionTransacted(Boolean.FALSE); jmsListenerContainerFactory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); return jmsListenerContainerFactory; } }
在消息处理程序中,显式完成或放弃消息。
@JmsListener(destination = "QUEUE_NAME", containerFactory = "customQueueJmsListenerContainerFactory") public void receiveMessage(JmsTextMessage message) throws Exception { String event = message.getBody(String.class); try { logger.info("Received event: {}", event); logger.info("Received message: {}", message); // by default complete the message message.acknowledge(); } catch (Exception e) { logger.error("Exception while processing re-source event: " + event, e); JmsAcknowledgeCallback acknowledgeCallback = message.getAcknowledgeCallback(); // explicitly abandon the message acknowledgeCallback.setAckType(MODIFIED_FAILED); message.setAcknowledgeCallback(acknowledgeCallback); message.acknowledge(); throw e; } }
配置问题
禁用服务总线 JMS 自动配置
问题说明
某些用户导入一些 Spring Cloud Azure Starter 来自动配置 Azure 服务,而不是服务总线 JMS。 它们还使用 Spring JMS 框架,而无需服务总线 JMS。 然后,当应用程序尝试启动时,将引发以下异常:
Caused by: java.lang.IllegalArgumentException: 'spring.jms.servicebus.connection-string' should be provided
at com.azure.spring.cloud.autoconfigure.jms.properties.AzureServiceBusJmsProperties.afterPropertiesSet(AzureServiceBusJmsProperties.java:210)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1863)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
... 98 more
原因分析
出现此问题的原因是所有 Spring Cloud Azure 自动配置类都放置在同一模块中,因此任何 Spring Cloud Azure Starter 实际上都会导入所有自动配置,其中包括服务总线 JMS。 然后,当应用程序使用 Spring JMS API 时,它满足 服务总线 JMS 自动配置 并触发它的条件。 然后,对于不打算使用 spring-cloud-azure-starter-servicebus-jms
的用户,不会满足属性条件,因为没有理由为 JMS 配置服务总线。 这种情况会导致引发异常。
溶液
Spring Cloud Azure for Service Bus JMS 提供用于打开或关闭其自动配置的属性。 可以使用以下属性设置根据需要选择禁用此功能:
spring.jms.servicebus.enabled=false
配置消息属性
如何设置出站消息的内容类型?
若要配置内容类型,请自定义消息转换器,以在转换消息时修改内容类型属性。 以下代码以字节消息为例。
首先,自定义在 JmsTemplate
中使用的消息转换器,如以下示例所示:
public class CustomMappingJackson2MessageConverter extends MappingJackson2MessageConverter {
public static final String CONTENT_TYPE = "application/json";
public CustomMappingJackson2MessageConverter() {
this.setTargetType(MessageType.BYTES);
}
@Override
protected BytesMessage mapToBytesMessage(Object object, Session session, ObjectWriter objectWriter)
throws JMSException, IOException {
final BytesMessage message = super.mapToBytesMessage(object, session, objectWriter);
JmsBytesMessage msg = (JmsBytesMessage) message;
AmqpJmsMessageFacade facade = (AmqpJmsMessageFacade) msg.getFacade();
facade.setContentType(Symbol.valueOf(CONTENT_TYPE));
return msg;
}
}
然后,声明自定义的消息转换器 bean,如以下示例所示:
@Configuration(proxyBeanMethods = false)
public class CustomJmsConfiguration {
@Bean
public MessageConverter messageConverter() {
return new CustomMappingJackson2MessageConverter();
}
}
如何为 MappingJackson2MessageConverter 设置类型 ID 属性名称?
type-id-property-name
属性使 MappingJackson2MessageConverter
能够确定用于反序列化消息有效负载的类。 将每个 Java 对象序列化为 Spring Message 有效负载时,转换器会将有效负载类型存储在消息属性中,该属性名称由 type-id-property-name
记录。 然后,反序列化消息时,转换器从消息中读取类型 ID 并执行反序列化。
若要设置 type-id-property-name
,请声明自己的 MappingJackson2MessageConverter
bean 并配置该属性,如以下示例所示:
@Configuration(proxyBeanMethods = false)
public class CustomJmsConfiguration {
@Bean
public MessageConverter jacksonJmsMessageConverter()
{
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTypeIdPropertyName("your-custom-type-id-property-name");
return converter;
}
}
重复检测
Azure 服务总线支持 重复检测,该检测应用 MessageId
属性以唯一标识消息并丢弃发送到服务总线的重复项。
但是,对于 JMS API,不应设置 JMS 消息 ID,该 ID 被视为 JMS 规范中的 非法。 因此,Spring Cloud Azure 服务总线 JMS 初学者目前不支持此功能。
有关此功能的任何其他更新,请参阅 GitHub 问题。
启用 AMQP 传输日志记录
有关详细信息,请参阅 启用 AMQP 传输日志记录 部分,排查服务总线问题。
获取其他帮助
有关联系支持的方法的详细信息,请参阅存储库根目录中 支持。
Spring Cloud Azure 服务总线 JMS 初学者的资源
- 将 Azure 服务总线与 JMS 配合使用
- 在 Spring 中使用 JMS 访问 Azure 服务总线
- 适用于 Spring Cloud Azure 4.0 的
迁移指南 - 示例
提交 GitHub 问题
提交 GitHub 问题时,会请求以下详细信息:
- 服务总线配置/命名空间环境
- 命名空间(标准层或高级层)是什么?
- 正在使用哪种类型的消息传送实体(队列或主题) ? 及其配置。
- 每个邮件的平均大小是多少?
- 流量模式是什么? (也就是说,每分钟的消息数以及客户端是始终繁忙还是流量缓慢。
- 重现代码和步骤
- 这很重要,因为我们经常无法在环境中重现问题。
- 原木