Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этой статье описывается устранение известных проблем и распространенных ошибок при использовании Spring JMS. В статье также приведены ответы на некоторые часто задаваемые вопросы о spring-cloud-azure-starter-servicebus-jms
.
Проблемы с подключением
СообщениеProducer было закрыто из-за неустранимой ошибки
Описание проблемы
При использовании JmsTemplate
для отправки сообщений JmsTemplate
становится недоступным в течение интервала простоя от 10 до 15 минут. Отправка сообщений в этом интервале может получить исключения, показанные в следующем примере выходных данных:
2022-11-06 11:12:05.762 INFO 25944 --- [ scheduling-1] c.e.demo.ServiceBusJMSMessageProducer : Sending message: 2022-11-06T11:12:05.762072 message 1
2022-11-06 11:12:05.772 ERROR 25944 --- [ scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task
org.springframework.jms.IllegalStateException: The MessageProducer was closed due to an unrecoverable error.; nested exception is javax.jms.IllegalStateException: The MessageProducer was closed due to an unrecoverable error.
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:274) ~[spring-jms-5.3.23.jar:5.3.23]
...
Caused by: org.apache.qpid.jms.provider.ProviderException: The link 'G0:36906660:qpid-jms:sender:azure:5caf3ef4-9602-413c-964d-cf1292d6e1f5:1:1:1:t4' is force detached. Code: publisher(link376). Details: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00. [condition = amqp:link:detach-forced]
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToNonFatalException(AmqpSupport.java:181) ~[qpid-jms-client-0.53.0.jar:na]
...
Анализ причин
Исключения происходят для служебной шины Azure, когда подключение и ссылка AMQP активны, но не вызываются ( например, отправка или прием вызовов) выполняются с помощью ссылки в течение 10 минут. В этом случае ссылка закрыта. И когда все ссылки в соединении были закрыты, так как не было действия (бездействия) и новая ссылка не была создана в течение 5 минут, подключение закрывается.
Для начального модуля JMS служебной шины CachingConnectionFactory используется по умолчанию, который кэширует сеанс, производитель и потребитель. Если JmsProducer
неактивен более 10 минут, но менее 15, ссылка, занятая кэшируемым производителем, была закрыта. Сообщения не могут отправляться в течение этого интервала. Затем, через еще 5 минут бездействия, все подключение закрывается. Таким образом, любая операция отправки после интервала простоя в 15 минут приводит к тому, что CachingConnectionFactory
создать новое подключение для отправки. Операция отправки становится доступной через 15 минут.
Обходное решение
В настоящее время начальный элемент предоставляет обходной путь для проблемы отсоединения ссылок, применяя JmsPoolConnectionFactory
, которые пулы Connection
, Session
и MessageProducer
, а также управляют жизненным циклом пуловых экземпляров. Это обходное решение может гарантировать, что производитель вытесняется после недоступности и, следовательно, все операции отправки выполняются для активных производителей.
Чтобы использовать это решение, добавьте следующую конфигурацию:
spring:
jms:
servicebus:
pool:
enabled: true
max-connections: ${your-expected-max-connection-value}
Использование spring.jms.servicebus.idle-timeout
Свойства времени ожидания простоя настраивают время ожидания простоя подключения AMQP. Спецификация AMQP содержит следующее описание:
Подключения подвергаются порогу времени ожидания простоя. Время ожидания активируется локальным одноранговым элементом, если после превышения порогового значения кадры не получены. Время ожидания простоя измеряется в миллисекундах и начинается с момента получения последнего кадра. Если пороговое значение превышено, то одноранговый узел должен попытаться корректно закрыть соединение с помощью близкого кадра с ошибкой, объясняющей причину. Если удаленный одноранговый узел не реагирует на это пороговое значение, то одноранговый узел МОЖЕТ закрыть сокет TCP.
Для клиента JMS при настройке этого свойства вы управляете на стороне сервера, сколько времени ожидается, что сервер отправляет пустой кадр, чтобы сохранить подключение в живых, если сообщения не доставляются. Это свойство управляет поведением удаленного однорангового узла, и каждый одноранговый узел может иметь собственное, изолированное значение.
Проблемы С JmsTemplate
Запланированные сообщения
Служебная шина Azure поддерживает отложенную обработку сообщений. Дополнительные сведения см. в разделе запланированных сообщений ScheduledEnqueueTimeUtc
с помощью заголовка заметки сообщения x-opt-scheduled-enqueue-time
.
Проблемы JmsListener
Слишком много запросов отправляются в служебную шину, несмотря на отсутствие сообщений на сервере
Описание проблемы
При использовании API @JmsListener
в некоторых случаях можно увидеть на портале Azure текущие значения входящих запросов, отправленных в очередь или разделы, даже если на сервере нет сообщений.
Анализ причин
@JmsListener
— это прослушиватель опроса, созданный для повторных попыток опроса.
Прослушиватель сидит на постоянном цикле опроса. Каждый цикл вызывает метод JMS MessageConsumer.receive()
для опроса локального потребителя сообщений. По умолчанию для каждой операции опроса локальный потребитель отправляет запросы на вытягивание брокеру сообщений, чтобы запросить сообщения, а затем блокировать в течение определенного периода времени. Конкретный процесс опроса определяется несколькими свойствами, включая receiveTimeout
, prefetchSize
, а также receiveLocalOnly
или receiveNoWaitLocalOnly
. Метод receiveNoWaitLocalOnly
используется только в том случае, если receiveTimeout
отрицательное значение.
При возникновении этой проблемы с приложением проверьте следующие параметры конфигурации:
Определите, является ли политика предварительной выборки 0, которая также является параметром по умолчанию. 0-предварительная выборка означает, что потребитель по запросу на вытягивание отправляет запросы на вытягивание в служебную шину для каждого опроса.
Если вы настроили предварительную выборку, отличной от нуля, определите, задано ли свойство
receiveLocalOnly
илиreceiveNoWaitLocalOnly
значениеfalse
, что является параметром по умолчанию. Значениеfalse
здесь по-прежнему приводит к отправке запросов на вытягивание на сервер, так как он не только опрашивает локального потребителя.Конфигурация
receiveTimeout
определяет, сколько времени он блокирует для каждого запроса на вытягивание, поэтому он может повлиять на частоту отправки запросов на вытягивание на сервер. Значение по умолчанию — 1 секунда.
Полный анализ см. в обсуждении проблемы GitHub.
Решения
В следующих разделах описаны два решения для решения этой проблемы.
Решение 1. Изменение только отправки потребителей и локальной проверки
Изменив режим на push
, потребитель становится асинхронного уведомления потребителя, который не извлекает сообщения из брокера, но поддерживает целевой объем кредита на связь. Сумма определяется свойством предварительной выборки. По мере того как служебная шина (отправитель) отправляет сообщения, кредит на связь отправителя уменьшается, и когда кредит на связь отправителя снижается ниже порогового значения, клиент (получатель) отправляет запрос серверу, чтобы увеличить кредит ссылки отправителя обратно на нужную целевую сумму.
Чтобы выполнить это решение, добавьте следующую конфигурацию:
Сначала настройте prefetch
число как ненулевое, которое настраивает потребителя как не вытягивающее. В следующей таблице показаны несколько свойств предварительной выборки, каждый из которых управляет различными сущностями служебной шины. Задайте свойства, которые применяются к вашему делу.
Свойство | Описание |
---|---|
spring.jms.servicebus.prefetch.all |
Резервное значение для параметра предварительной выборки в этом пространстве имен служебной шины |
spring.jms.servicebus.prefetch.queue-prefetch |
Номер предварительной выборки для очереди. |
spring.jms.servicebus.prefetch.queue-browser-prefetch |
Номер предварительной выборки для браузера очередей. |
spring.jms.servicebus.prefetch.topic-prefetch |
Номер предварительной выборки для раздела. |
spring.jms.servicebus.prefetch.durable-topic-prefetch |
Номер предварительной выборки для устойчивого раздела. |
Во-вторых, настройте non-local-check
, добавив класс конфигурации для настройщика фабрики, как показано в следующем примере:
@Configuration(proxyBeanMethods = false)
public class CustomJmsConfiguration {
@Bean
ServiceBusJmsConnectionFactoryCustomizer customizer() {
return factory -> {
factory.setReceiveLocalOnly(true);
// Configure the below ReceiveNoWaitLocalOnly instead if you have specified the property
// spring.jms.listener.receive-timeout with negative value. Otherwise, configure the above `ReceiveLocalOnly`.
//factory.setReceiveNoWaitLocalOnly(true);
};
}
}
Значение предварительной выборки может повлиять на то, как быстро сообщения отправляются в локальный буфер потребителя. Необходимо настроить значение в соответствии с производительностью и томами сообщений. Подходящее значение может ускорить процесс потребления, а слишком большое значение может привести к тому, что локально буферированные сообщения становятся устаревшими и перенаправляются снова. Для небольших томов сообщений, где каждое сообщение занимает много времени для обработки, задайте для предварительной выборки значение 1. Это значение гарантирует, что потребитель обрабатывает только одно сообщение одновременно.
Решение 2. Увеличьте время ожидания получения, чтобы уменьшить частоту вытягивания
Свойство времени ожидания получения определяет стратегию того, как долго потребитель блокирует ожидание результата извлечения. Таким образом, расширив время ожидания, можно уменьшить частоту вытягивания, а затем уменьшить количество запросов на вытягивание при выборе режима вытягивания. В крайних случаях можно задать стратегию ожидания на неопределенный срок до тех пор, пока сообщение не появится, что означает, что потребитель только извлекает сообщение после использования сообщения. В этом случае, если на сервере нет сообщений, он блокирует ожидание.
Чтобы выполнить это решение, настройте свойство spring.jms.listener.receive-timeout
. Это свойство имеет тип java.time.Duration
и имеет значение по умолчанию в 1 секунду. В следующем списке объясняется влияние различных значений:
- Установка времени ожидания получения в значение 0 означает, что блоки вытягивания неопределенные до отправки сообщения.
- Установка времени ожидания получения на положительное значение означает, что блоки вытягивания до времени ожидания.
- При задании времени ожидания получения отрицательное значение означает, что вытягивание не ожидает получения, что означает, что сообщение возвращается немедленно или
null
, если сообщения недоступны.
Заметка
Большое значение времени ожидания может привести к некоторым побочным эффектам. Например, значение высокого времени ожидания также расширяет время, когда основной поток находится в состоянии блока. Это состояние означает, что контейнер будет менее реагировать на вызовы stop()
и может останавливаться только между вызовами receive()
.
Кроме того, контейнер может отправлять запросы только после прохождения интервала receive-timeout
. Если интервал превышает 10 минут, служебная шина закроет ссылку и запретит прослушивателю отправлять или получать. Дополнительные сведения см. в разделе Ссылка закрыта раздела об ошибках AMQP в служебной шине Azure. По умолчанию прослушиватель использует CachingConnectionFactory.
Если требуется высокое время ожидания получения, обязательно используйте JmsPoolConnectionFactory.
Дополнительные сведения о проблеме закрытия ссылок и использовании JmsPoolConnectionFactory
см. в разделе СообщениеProducer было закрыто из-за неустранимой ошибки раздела.
Проблема с предварительной выборкой
Описание проблемы
Неподходяемая политика предварительной выборки может вызвать следующие проблемы:
- Те же сообщения многократно используются.
- Сообщения помещаются в очередь недоставленных писем после
MaxDeliveryCountExceeded
, даже если сообщения обрабатываются без ошибок или исключений.
Анализ причин
Эта проблема обычно возникает, когда значении выше фактической емкости, при этом слишком много сообщений предварительно извлекаются в локальный буфер, ожидающий использования. Однако предварительно подготовленные сообщения отображаются как отправленные в режиме проверки блокировки со стороны служебной шины. Каждое отправленное сообщение содержит атрибуты максимального количества доставки и длительности блокировки. В режиме получения с блокировкой сообщения, полученные в буфер предварительной выборки, получаются в буфер в заблокированном состоянии с часовым временем ожидания блокировки. Если буфер предварительной выборки большой, и обработка занимает так долго, что срок действия блокировки сообщений истекает во время пребывания в буфере предварительной выборки, сообщение обрабатывается как заброшенное и снова становится доступным для получения из очереди.
Эта проблема может привести к тому, что сообщение будет получено в буфер предварительной выборки и помещено в конец. Если буфер предварительной выборки не обрабатывается до истечения срока действия сообщения, сообщения повторяются, но никогда не предоставляются в допустимом состоянии (допустимо заблокировано). Затем, когда устаревшие копии удаляются, приложение затем использует одно и то же сообщение многократно и не может завершить их. В другом случае повторяющиеся сообщения истекают в буфере до их использования. В этом случае сообщение служебной шины в конечном итоге будет перемещено в очередь недоставленных писем после превышения максимального количества доставки.
Дополнительные сведения см. в разделе Почему предварительная выборка не используется по умолчанию? разделе Предварительное получение сообщений служебной шины Azure.
Решение
Будьте осторожны с конфигурацией предварительной выборки, чтобы убедиться, что она соответствует возможности использования. Необходимо сбалансировать максимальное количество предварительных выборок и длительность блокировки, настроенную в очереди или подписке, так что время ожидания блокировки по крайней мере превышает совокупное ожидаемое время обработки сообщений для максимального размера буфера предварительной выборки, а также одно сообщение. В то же время время время ожидания блокировки не должно быть так долго, чтобы сообщения могли превышать максимальное время жизни, когда они случайно удалены, тем самым требуя истечения срока действия блокировки до повторного создания.
Чтобы настроить атрибут предварительной выборки, имеющий значение по умолчанию нулю, используйте одно из следующих свойств:
Свойство | Описание |
---|---|
spring.jms.servicebus.prefetch.all |
Резервное значение для параметра предварительной выборки в этом пространстве имен служебной шины. |
spring.jms.servicebus.prefetch.queue-prefetch |
Номер предварительной выборки для очереди. |
spring.jms.servicebus.prefetch.queue-browser-prefetch |
Номер предварительной выборки для браузера очередей. |
spring.jms.servicebus.prefetch.topic-prefetch |
Номер предварительной выборки для раздела. |
spring.jms.servicebus.prefetch.durable-topic-prefetch |
Номер предварительной выборки для устойчивого раздела. |
Как выполнить ликвидацию AMQP в служебной шине?
JMS поддерживает пять типов ликвидации AMQP при подтверждении сообщений брокеру обмена сообщениями. Поддерживаемые значения: ACCEPTED
, REJECTED
, RELEASED
, MODIFIED_FAILED
и MODIFIED_FAILED_UNDELIVERABLE
. Дополнительные сведения см. в разделе сопоставление операций amQP и сопоставления операций служебной шиныиспользовать службу сообщений Java 1.1 со стандартом служебной шины Azure иAMQP 1.0.
Таким образом, чтобы вручную завершить, отказаться, отложить или освободить сообщение с помощью JmsListener
, выполните следующие действия:
Отключите транзакцию сеансов и используйте режимack CLIENT.
Чтобы выполнить эту задачу, объявите собственные JmsListenerContainerFactory beany, а затем задайте свойства или выполните процедуру
JmsListenerContainerFactory
, определенную в начальной . В следующем примере используется подход объявления другой бобов:@Configuration(proxyBeanMethods = false) public class CustomJmsConfiguration { @Bean public JmsListenerContainerFactory<?> customQueueJmsListenerContainerFactory( DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory jmsListenerContainerFactory = new DefaultJmsListenerContainerFactory(); configurer.configure(jmsListenerContainerFactory, connectionFactory); jmsListenerContainerFactory.setPubSubDomain(Boolean.FALSE); jmsListenerContainerFactory.setSessionTransacted(Boolean.FALSE); jmsListenerContainerFactory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); return jmsListenerContainerFactory; } }
В обработчике сообщений явно завершены или отказались от сообщений.
@JmsListener(destination = "QUEUE_NAME", containerFactory = "customQueueJmsListenerContainerFactory") public void receiveMessage(JmsTextMessage message) throws Exception { String event = message.getBody(String.class); try { logger.info("Received event: {}", event); logger.info("Received message: {}", message); // by default complete the message message.acknowledge(); } catch (Exception e) { logger.error("Exception while processing re-source event: " + event, e); JmsAcknowledgeCallback acknowledgeCallback = message.getAcknowledgeCallback(); // explicitly abandon the message acknowledgeCallback.setAckType(MODIFIED_FAILED); message.setAcknowledgeCallback(acknowledgeCallback); message.acknowledge(); throw e; } }
Проблемы с конфигурацией
Отключение автоматической настройки JMS служебной шины
Описание проблемы
Некоторые пользователи импортируют некоторые начальные параметры Azure Spring Cloud для автоматической настройки службы Azure, отличной от JMS служебной шины. Они также используют платформу Spring JMS без необходимости JMS служебной шины. Затем при попытке запуска приложения возникают следующие исключения:
Caused by: java.lang.IllegalArgumentException: 'spring.jms.servicebus.connection-string' should be provided
at com.azure.spring.cloud.autoconfigure.jms.properties.AzureServiceBusJmsProperties.afterPropertiesSet(AzureServiceBusJmsProperties.java:210)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1863)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
... 98 more
Анализ причин
Эта проблема возникает из-за того, что все классы автонастройки Azure Spring Cloud помещаются в один модуль, поэтому любой начальный модуль Azure Spring Cloud фактически импортирует все эти автонастройки, включая JMS служебной шины. Затем, когда приложение использует API Spring JMS, оно соответствует условию автоматической настройки JMS служебной шины spring-cloud-azure-starter-servicebus-jms
, условия свойства не выполняются, так как для них нет оснований настраивать служебную шину для JMS. Эта ситуация приводит к возникновению исключений.
Решение
Spring Cloud Azure для JMS служебной шины предоставляет свойство для включения или отключения автоматической настройки. Вы можете отключить эту функцию по мере необходимости с помощью следующего параметра свойства:
spring.jms.servicebus.enabled=false
Настройка атрибутов сообщения
Как задать тип контента исходящих сообщений?
Чтобы настроить тип контента, настройте конвертер сообщений для изменения атрибута типа контента при преобразовании сообщений. Следующий код принимает сообщения байтов в качестве примера.
Сначала настройте преобразователь сообщений для использования в JmsTemplate
, как показано в следующем примере:
public class CustomMappingJackson2MessageConverter extends MappingJackson2MessageConverter {
public static final String CONTENT_TYPE = "application/json";
public CustomMappingJackson2MessageConverter() {
this.setTargetType(MessageType.BYTES);
}
@Override
protected BytesMessage mapToBytesMessage(Object object, Session session, ObjectWriter objectWriter)
throws JMSException, IOException {
final BytesMessage message = super.mapToBytesMessage(object, session, objectWriter);
JmsBytesMessage msg = (JmsBytesMessage) message;
AmqpJmsMessageFacade facade = (AmqpJmsMessageFacade) msg.getFacade();
facade.setContentType(Symbol.valueOf(CONTENT_TYPE));
return msg;
}
}
Затем объявите настроенный преобразователь сообщений, как показано в следующем примере:
@Configuration(proxyBeanMethods = false)
public class CustomJmsConfiguration {
@Bean
public MessageConverter messageConverter() {
return new CustomMappingJackson2MessageConverter();
}
}
Как задать имя свойства идентификатора типа для MappingJackson2MessageConverter?
Атрибут type-id-property-name
позволяет MappingJackson2MessageConverter
определить, какой класс следует использовать для десериализации полезных данных сообщения. При сериализации каждого объекта Java в полезные данные Spring Message преобразователь сохраняет тип полезных данных в свойстве сообщения с именем свойства, записанным type-id-property-name
. Затем при десериализации сообщения преобразователь считывает идентификатор типа из сообщения и выполняет десериализацию.
Чтобы задать type-id-property-name
, объявите собственные MappingJackson2MessageConverter
bean и настройте это свойство, как показано в следующем примере:
@Configuration(proxyBeanMethods = false)
public class CustomJmsConfiguration {
@Bean
public MessageConverter jacksonJmsMessageConverter()
{
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTypeIdPropertyName("your-custom-type-id-property-name");
return converter;
}
}
Обнаружение повторяющихся данных
Служебная шина Azure поддерживает обнаружения повторяющихся данных, которая применяет свойство MessageId
для уникальной идентификации сообщений и отмены повторяющихся сообщений, отправленных в служебную шину.
Однако для API JMS не следует задавать идентификатор сообщения JMS, который считается незаконным в спецификациях JMS. Таким образом, эта функция в настоящее время не поддерживается для Начального модуля JMS служебной шины Azure Spring Cloud.
Дополнительные обновления этой функции см. впроблемы с GitHub
Включение ведения журнала транспорта AMQP
Дополнительные сведения см. в разделе включение ведения журнала транспорта AMQP раздела Устранение неполадок служебной шины.
Получение дополнительной справки
Дополнительные сведения о способах получения поддержки см. в разделе Поддержка в корневом каталоге репозитория.
Ресурсы для начального модуля JMS служебной шины Azure Spring Cloud
- использование служебной шины Azure с JMS
- использовать JMS в Spring для доступа к служебной шине Azure
- Руководство по миграции для Spring Cloud Azure 4.0
- пример
Проблемы с отправкой GitHub
При отправке проблем с GitHub запрашиваются следующие сведения:
- Конфигурация служебной шины / среда пространства имен
- Какой уровень — пространство имен (стандартная или премиум)?
- Какой тип сущности обмена сообщениями используется (очередь или раздел)? и ее конфигурация.
- Какой средний размер каждого сообщения?
- Каков шаблон трафика? (то есть число сообщений в минуту и всегда ли клиент занят или имеет медленные периоды трафика.)
- Код и шаги повторной подготовки
- Это важно, так как мы часто не можем воспроизвести проблему в нашей среде.
- Журналы