Поделиться через


Шаблоны задач репликации сообщений

В общих сведениях о федерации и функция репликатора приведены обоснования и описаны основные элементы задач репликации, поэтому рекомендуется ознакомиться с ними, прежде чем продолжать работу с этой статьей.

В этой статье подробно рассмотрены рекомендации по реализации некоторых шаблонов, выделенных в разделе "Обзор".

Репликация

Шаблон репликации копирует сообщения из одной очереди или раздела в следующую или из очереди или раздела в другое место, например в концентратор событий. Сообщения перенаправляются без внесения изменений в полезные данные.

Реализация этого шаблона рассмотрена в примере репликации сообщений с службу "Служебная шина Azure" и из нее.

Сохранение последовательности и порядка

Модель репликации не нацелена на воспроизведение абсолютного порядка сообщений исходной очереди или раздела в целевой очереди или разделе, но при необходимости она сохраняет относительный порядок сообщений, где это требуется приложению. Для этого в приложении реализована поддержка сеансов для исходной сущности и группировка связанных сообщений с одним ключом сеанса.

Встроенные функции репликации с учетом сеансов гарантируют, что последовательности сообщений с одинаковым идентификатором сеанса, извлеченных из исходной сущности, будут направлены в целевую очередь или раздел в виде пакета в исходном порядке с тем же идентификатором сеанса.

Метаданные, назначенные службой

Назначаемые службой метаданные сообщения, полученного из исходной очереди или раздела (исходное время постановки в очередь и порядковый номер), заменяются новыми значениями, присваиваемыми службой в целевой очереди или разделе. Однако для задач репликации по умолчанию, которые представлены в наших примерах, исходные значения сохраняются в пользовательских свойствах: repl-enqueue-time (строка ISO8601) и repl-sequence.

Эти свойства имеют строковый тип (string) и содержат переведенные в строковый вид значения соответствующих исходных свойств. Если сообщение пересылается несколько раз, к уже существующим свойствам добавляются метаданные службы из очередного источника, а значения при этом разделяются точкой с запятой.

Отработка отказа

Если вы применяете репликацию для аварийного восстановления, для защиты от региональных сообщений о доступности в службе "Служебная шина" или от перебоев в сети, любой такой сценарий сбоя потребует отработки отказа из одной очереди или раздела в другие, чтобы создатели и (или) потребители знали, что следует использовать дополнительную конечную точку.

Во всех сценариях отработки отказа предполагается, что необходимые элементы пространств имен структурно идентичны, т. е. очереди и разделы имеют идентичные имена, а правила подписей общего доступа и/или правила управления доступом на основе ролей настроены аналогичным образом. Вы можете создать (и обновить) дополнительное пространство имен, следуя указаниям по перемещению пространств имен и пропустив шаг очистки.

Чтобы заставить создателей и потребителей выполнить переключение, необходимо предоставить сведения о новом пространстве имен для поиска в расположении, которое легко доступно и которое просто обновлять. Если создатели или потребители сталкиваются с частыми или постоянными ошибками, им следует связаться с этим расположением и изменить свою конфигурацию. Предоставить соответствующую конфигурацию можно различными способами, но мы выделим два: DNS и общие папки.

Конфигурация отработки отказа на основе DNS

Одним из возможных подходов является хранение информации в SRV-записях DNS в управляемом вами DNS с указанием соответствующей конечной точки очереди или раздела. Обратите внимание, что концентраторы сообщений не допускают присвоения конечным точкам псевдонимов напрямую с помощью записей CNAME. Это означает, что DNS следует использовать в качестве отказоустойчивого механизма поиска адресов конечных точек, а не для непосредственного разрешения IP-адресов.

Предположим, что вы владеете доменом example.com, а также (для своего приложения) зоной test.example.com. Для двух разных служебных шин вы создаете две вложенные зоны с записью SRV в каждой из них.

Запись SRV, по общему соглашению, имеет префикс _azure_servicebus._amqp и содержит две записи конечных точек: одну для AMQP через TLS на порту 5671, вторую — для AMQP через WebSocket на порту 443. Обе эти записи указывают на конечную точку служебной шины пространства имен, соответствующего зоне.

Зона Запись SRV
sb1.test.example.com _azure_servicebus._amqp.sb1.test.example.com
1 1 5671 sb1-test-example-com.servicebus.windows.net
2 2 443 sb1-test-example-com.servicebus.windows.net
sb2.test.example.com _azure_servicebus._amqp.sb1.test.example.com
1 1 5671 sb2-test-example-com.servicebus.windows.net
2 2 443 sb2-test-example-com.servicebus.windows.net

В зоне приложения вы затем создаете запись CNAME, которая указывает на подчиненную зону, соответствующую основной очереди или разделу:

Запись CNAME Псевдоним
servicebus.test.example.com sb1.test.example.com

С помощью DNS-клиента, который позволяет явно запрашивать записи CNAME и SRV (встроенные клиенты Java и .NET разрешают только простое разрешение имен в IP-адреса), вы можете разрешить нужную конечную точку. Например, с помощью DnsClient.NET функция поиска (определения адреса) имеет следующий вид:

static string GetServiceBusName(string aliasName)
{
    const string SrvRecordPrefix = "_azure_servicebus._amqp.";
    LookupClient lookup = new LookupClient();

    return (from CNameRecord alias in (lookup.Query(aliasName, QueryType.CNAME).Answers)
            from SrvRecord srv in lookup.Query(SrvRecordPrefix + alias.CanonicalName, QueryType.SRV).Answers
            where srv.Port == 5671
            select srv.Target).FirstOrDefault()?.Value.TrimEnd('.');
}

Функция возвращает имя целевого узла, зарегистрированное для порта 5671 зоны, псевдонимом которой в данный момент является CNAME, как показано выше.

Для отработки отказа необходимо изменить запись CNAME и перенаправить ее на альтернативную зону.

Преимущество использования DNS и, в частности Azure DNS, заключается в том, что информация Azure DNS глобально реплицируется и, следовательно, устойчива к сбоям в отдельных регионах.

Эта процедура аналогична работе географического аварийного восстановления Служебной шины, однако полностью контролируется вами и работает со сценариями "активный/активный".

Конфигурация отработки отказа на основе общих папок

Самая простая альтернатива использованию DNS для предоставления сведений о конечной точке — сохранение имени основной конечной точки в обычном текстовом файле и его предоставление из инфраструктуры, устойчивой по отношению к сбоям, но при этом позволяющей выполнять обновления.

Если у вас уже развернута инфраструктура веб-сайтов высокой надежности с глобальной репликацией доступности и содержимого, добавьте такой файл и при необходимости (если требуется переключение) опубликуйте его повторно.

Слияние

Шаблон объединения содержит одну или несколько задач репликации, указывающих на одну цель и, возможно, одновременно с обычными создателями отправляющих сообщения в один и тот же целевой объект.

Варианты этого шаблона:

  • Две или несколько функций репликации одновременно получают сообщения из разных источников и отправляют их в один и тот же целевой объект.
  • Еще одна функция репликации получает сообщения из источника, в то время как целевой объект также используется непосредственно создателями.
  • Аналогично первому варианту, но при этом сообщения зеркально отражаются в двух или нескольких разделах, в результате чего эти разделы содержат одни и те же сообщения независимо от того, где сообщения создаются.

Первые два варианта шаблона являются тривиальными и не отличаются от обычных задач репликации.

Последний сценарий требует повторной репликации уже реплицированных сообщений. Этот метод демонстрируется и описывается в примере "активный/активный".

Редактор

Шаблон редактора основан на схеме репликации, но сообщения изменяются до их пересылки. Ниже приведены примеры таких изменений.

  • Перекодирование: если содержимое сообщения (также называемое телом или полезными данными) поступает из источника, использующего формат кодирования Apache Avro или какой-либо собственный формат сериализации, система, владеющая целевым объектом, ожидает, кодирования в формате JSON, задача репликации с перекодированием сначала десериализует полезные данные из Apache Avro в граф объектов в памяти, а затем сериализует этот граф в формат JSON для перенаправляемого сообщения. Перекодирование также включает задачи сжатия и распаковки содержимого.
  • Преобразование: сообщения, содержащие структурированные данные, могут потребовать переформатирования этих данных для упрощения их использования последующими потребителями. Это может включать такие задачи, как, например, сведение вложенных структур, удаление лишних элементов данных или изменение формата полезной нагрузки согласно определенной схеме.
  • Пакетная обработка: сообщения могут поступать из источника пакетами (несколько сообщений в одной передаче), но должны перенаправляться в целевой объект по одному, или наоборот. Таким образом, задача может пересылать несколько сообщений из одного входящего пакета или объединять набор сообщений, которые затем передаются вместе.
  • Проверка: данные сообщений из внешних источников часто требуется проверять на соответствие набору правил, прежде чем они могут быть перенаправлены. Правила могут выражаться с помощью схем или кода. Сообщения, которые не соответствуют условиям, могут удаляться (с регистрацией этого события в журналах) или перенаправляться в специальный целевой объект для дальнейшей обработки.
  • Обогащение: для сообщений, поступающих из некоторых источников, может потребоваться обогащение в другом контексте, прежде чем их можно будет использовать в целевых системах. Это может включать поиск базовых данных и их внедрение в сообщение либо добавление сведений об источнике, известном задаче репликации, но не указанном в сообщения.
  • Фильтрация: некоторые сообщения, поступающие из источника, могут не перенаправляться в целевой объект на основе какого-либо правила. Фильтр проверяет сообщение на соответствие этому правилу и удаляет в случае несоответствия. Фильтрация повторяющихся сообщений посредством отслеживания определенных условий и удаления последующих сообщений с теми же значениями является формой фильтрации.
  • Маршрутизация и секционирование: некоторые задачи репликации могут включать две или несколько альтернативных целевых объектов, а также содержать правила, согласно которым целевой объект репликации выбирается для конкретного сообщения на основе его метаданных или содержимого. Особая форма маршрутизации — секционирование, где задача явным образом назначает секции в рамках одного целевого объекта репликации на основе правил.
  • Шифрование: задача репликации может расшифровывать содержимое, поступающие из источника, или зашифровывать содержимое, перенаправляемое в целевой объект, и (или) проверять целостность содержимого и метаданных на соответствие подписи, переданной в сообщении, а также добавлять такую подпись.
  • Аттестация: задача репликации может прикреплять к сообщению метаданные (возможно, защищенные цифровой подписью), которые подтверждают, что это сообщение было получено через определенный канал или в определенное время.
  • Связывание: задача репликации может применять к последовательностям сообщений подписи, чтобы защитить целостность такой последовательности, а также для возможности обнаружения отсутствующих сообщений.

Все такие схемы можно реализовать с помощью функций Azure, используя триггер концентраторов сообщений для получения сообщений и выходную привязку очереди или раздела для их доставки.

Маршрутизация

Шаблон маршрутизации основан на схеме репликации, но вместо одного источника и одного целевого объекта задача репликации имеет несколько целевых объектов, как показано здесь в коде на C#:

[FunctionName("SBRouter")]
public static async Task Run(
    [ServiceBusTrigger("source", Connection = "serviceBusConnectionAppSetting")] ServiceBusReceivedMessage[] messages,
    [ServiceBusOutput("dest1", Connection = "serviceBusConnectionAppSetting")] IAsyncCollector<dynamic> output1,
    [ServiceBusOutput("dest2", Connection = "serviceBusConnectionAppSetting")] IAsyncCollector<dynamic> output2,
    ILogger log)
{
    foreach (Message messageData in messages)
    {
        // send to output1 or output2 based on criteria 
    }
}

Функция маршрутизации будет учитывать метаданные и (или) полезные данные сообщения, а затем выбирать одно из доступных мест назначения для отправки.

Следующие шаги