Бөлісу құралы:


Шаблоны задач репликации событий

В общих сведениях о федерации и функция репликатора приведены обоснования и описаны основные элементы задач репликации, поэтому рекомендуется ознакомиться с ними, прежде чем продолжать работу с этой статьей.

В этой статье подробно рассмотрены рекомендации по реализации некоторых шаблонов, выделенных в разделе "Обзор".

Репликация

Шаблон репликации копирует события из одного концентратора событий в следующий или из концентратора событий в другое назначение, например в очередь служебной шины. События перенаправляются без внесения изменений в полезные данные события.

Реализация этого шаблона для конкретного случая репликации данных из брокера Apache Kafka в Центры событий охватывается выборками репликации событий между концентраторами событий и репликации событий между концентраторами событий и служебной шиной, а также учебником Использование Apache Kafka MirrorMaker с концентраторами событий.

Потоки и сохранение порядка

Репликация с помощью Функции Azure или Azure Stream Analytics не стремится обеспечить создание точных клонов 1:1 исходного концентратора событий в целевом концентраторе событий, но фокусируется на сохранении относительного порядка событий, для которых требуется приложение. Приложение сообщает об этом, группируя связанные события с одним и тем же ключом раздела, а концентраторы событий размещают сообщения с одним и тем же ключом раздела последовательно в одном разделе.

Внимание

Информация о смещении уникальна для каждого концентратора событий, и смещения для одних и тех же событий будут отличаться в разных экземплярах концентратора событий. Чтобы найти позицию в скопированном потоке событий, используйте смещения на основе времени и обратитесь к распространяемым метаданным, назначенным службой.

Смещения на основе времени запускают ваш приемник в определенный момент времени.

  • EventPosition.FromStart() — читает все сохраненные данные с начала.
  • EventPosition.FromEnd() — читает все новые данные с момента подключения.
  • EventPosition.FromEnqueuedTime(dateTime) — читает все данные начиная с определенной даты и времени.

В EventProcessor позиция задается с помощью InitialOffsetProvider в EventProcessorOptions. При использовании других API приемника эта позиция передается через конструктор.

Готовые вспомогательные приложения функции репликации, предоставляемые в качестве примеров, используемых в руководстве на основе Функций Azure, гарантируют, что потоки событий с одним и тем же ключом раздела, извлеченные из исходного раздела, передаются в целевой концентратор событий как пакет в исходном потоке и с тем же ключом раздела.

Если в исходном и целевом концентраторах одинаковое количество разделов, все потоки в целевом объекте будут сопоставляться с теми же разделами, что и в источнике. Если количество разделов разное, что имеет значение в некоторых других шаблонах, описанных ниже, сопоставление будет другим, но потоки всегда будут храниться вместе и по порядку.

Относительный порядок событий, принадлежащих разным потокам, или независимых событий без ключа раздела в целевом разделе всегда может отличаться от исходного раздела.

Метаданные, назначенные службой

Назначаемые службой метаданные события, полученного из исходного концентратора событий, исходное время постановки в очередь, порядковый номер и смещение заменяются новыми значениями, присваиваемыми службой в целевом концентраторе событий. Однако с помощью функций вспомогательного приложения в задачах репликации, которые представлены в наших примерах, исходные значения сохраняются в пользовательских свойствах: repl-enqueue-time (строка ISO8601), repl-sequence, repl-offset.

Эти свойства имеют строковый тип (string) и содержат переведенные в строковый вид значения соответствующих исходных свойств. Если событие пересылается несколько раз, к уже существующим свойствам добавляются метаданные службы из очередного источника, а значения при этом разделяются точкой с запятой.

Отработка отказа

Если вы используете реплика для аварийного восстановления, чтобы защититься от региональных событий доступности в службе Центров событий или от прерываний сети, любой такой сценарий сбоя потребует отработки отказа из одного концентратора событий на следующий, сообщая производителям и/или потребителям использовать вторичную конечную точку.

Для всех сценариев отработки отказа предполагается, что необходимые элементы пространств имен являются структурны идентичными, что означает, что центры событий и группы потребителей идентичны, и правила управления доступом на основе ролей настраиваются одинаково. Вы можете создать (и обновить) дополнительное пространство имен, следуя указаниям по перемещению пространств имен и пропустив шаг очистки.

Чтобы заставить создателей и потребителей выполнить переключение, необходимо предоставить сведения о новом пространстве имен для поиска в расположении, которое легко доступно и которое просто обновлять. Если создатели или потребители сталкиваются с частыми или постоянными ошибками, им следует связаться с этим расположением и изменить свою конфигурацию. Предоставить соответствующую конфигурацию можно различными способами, но мы выделим два: DNS и общие папки.

Конфигурация отработки отказа на основе DNS

Одним из возможных подходов является хранение информации в SRV-записях DNS в управляемой вами DNS с указанием соответствующих конечных точек концентратора событий.

Внимание

Помните, что Центры событий не позволяют напрямую псевдонимировать конечные точки с записями CNAME, что означает, что DNS будет использоваться как устойчивый механизм поиска для адресов конечных точек, а не для прямого разрешения сведений об IP-адресах.

Предположим, что вы владеете доменом example.com, а также (для своего приложения) зоной test.example.com. Для двух альтернативных центров событий теперь вы создадите две дополнительные вложенные зоны и запись SRV в каждом из них.

Запись SRV, по общему соглашению, имеет префикс _azure_eventhubs._amqp и содержит две записи конечных точек: одну для AMQP через TLS на порту 5671, вторую — для AMQP через WebSocket на порту 443. Обе эти записи указывают на конечную точку концентратора событий пространства имен, соответствующего зоне.

Зона Запись SRV
eh1.test.example.com _azure_servicebus._amqp.eh1.test.example.com
1 1 5671 eh1-test-example-com.servicebus.windows.net
2 2 443 eh1-test-example-com.servicebus.windows.net
eh2.test.example.com _azure_servicebus._amqp.eh2.test.example.com
1 1 5671 eh2-test-example-com.servicebus.windows.net
2 2 443 eh2-test-example-com.servicebus.windows.net

Затем в зоне приложения вам нужно создать запись CNAME, которая указывает на подчиненную зону, соответствующую основному концентратору событий.

Запись CNAME Псевдоним
eventhub.test.example.com eh1.test.example.com

С помощью DNS-клиента, который позволяет явно запрашивать записи CNAME и SRV (встроенные клиенты Java и .NET разрешают только простое разрешение имен в IP-адреса), вы можете разрешить нужную конечную точку. Например, с помощью DnsClient.NET функция поиска (определения адреса) имеет следующий вид:

static string GetEventHubName(string aliasName)
{
    const string SrvRecordPrefix = "_azure_eventhub._amqp.";
    LookupClient lookup = new LookupClient();

    return (from CNameRecord alias in (lookup.Query(aliasName, QueryType.CNAME).Answers)
            from SrvRecord srv in lookup.Query(SrvRecordPrefix + alias.CanonicalName, QueryType.SRV).Answers
            where srv.Port == 5671
            select srv.Target).FirstOrDefault()?.Value.TrimEnd('.');
}

Функция возвращает имя целевого узла, зарегистрированное для порта 5671 зоны, псевдонимом которой в данный момент является CNAME, как показано выше.

Для отработки отказа необходимо изменить запись CNAME и перенаправить ее на альтернативную зону.

Преимущество использования DNS и, в частности Azure DNS, заключается в том, что информация Azure DNS глобально реплицируется и, следовательно, устойчива к сбоям в отдельных регионах.

Эта процедура аналогична работе географического аварийного восстановления Центров событий, однако полностью контролируется вами и также может работать со сценариями "активный/активный".

Конфигурация отработки отказа на основе общих папок

Самая простая альтернатива использованию DNS для предоставления сведений о конечной точке — сохранение имени основной конечной точки в обычном текстовом файле и его предоставление из инфраструктуры, устойчивой по отношению к сбоям, но при этом позволяющей выполнять обновления.

Если у вас уже развернута инфраструктура веб-сайтов высокой надежности с глобальной репликацией доступности и содержимого, добавьте такой файл и при необходимости (если требуется переключение) опубликуйте его повторно.

Внимание

Вы должны публиковать имя конечной точки только таким образом, не указывая полную строку подключения, включая секреты.

Дополнительные рекомендации для потребителей отработки отказа

Для потребителей концентратора событий дальнейшие рекомендации по стратегии отработки отказа зависят от потребностей обработчика событий.

Если происходит авария, которая требует перестройки системы, в том числе баз данных, из данных резервной копии, а базы данных передаются напрямую или посредством промежуточной обработки из событий, хранящихся в концентраторе событий, необходимо восстановить резервную копию, а затем запустить воспроизведение событий в системе с момента создания резервной копии базы данных, а не с момента разрушения исходной системы.

Если сбой затрагивает только сегмент системы или только один концентратор событий, который стал недоступен, вы, вероятно, захотите продолжить обработку событий примерно с того же места, где обработка была прервана.

Чтобы реализовать любой сценарий и использовать обработчик событий соответствующего пакета Azure SDK, необходимо создать новое хранилище контрольных точек и предоставить начальную позицию раздела на основе метки времени, с которой нужно возобновить обработку.

Если у вас по-прежнему есть доступ к хранилищу контрольных точек концентратора событий, из которого вы переключаетесь, распространенные метаданные, описанные выше, помогут вам пропустить события, которые уже были обработаны, и возобновить обработку именно с того места, где вы в последний раз остановились.

Слияние

В шаблоне объединения имеется одна задача репликации или несколько, указывающих на один целевой объект, возможно, одновременно с обычными производителями, также отправляющими события в тот же целевой объект.

Существуют следующие варианты этих шаблонов.

  • Две функции репликации или более одновременно получают события из разных источников и отправляют их в один и тот же целевой объект.
  • Еще одна функция репликации получает события из источника, в то время как целевой объект также используется непосредственно создателями.
  • Предыдущий шаблон, но зеркально отраженный между двумя концентраторами событий или более, в результате чего эти концентраторы событий содержат одни и те же потоки, независимо от того, где создаются события.

Первые два варианта шаблона являются тривиальными и не отличаются от простых задач реплика tion.

В последнем сценарии требуется повторная репликация уже реплицированных событий. Этот метод демонстрируется и объясняется в примере EventHubToEventHubMerge.

Редактор

Шаблон редактора основан на схеме репликации, но сообщения изменяются до их пересылки.

Ниже приведены примеры таких изменений.

  • Перекодирование — если содержимое события (также называемое телом или полезными данными) поступает из источника, использующего формат кодирования Apache Avro или какой-либо собственный формат сериализации, но система, владеющая целевым объектом, ожидает содержимое в кодировке JSON, задача репликации с перекодированием сначала десериализует полезные данные из Apache Avro в граф объектов в памяти, а затем сериализует этот граф в формат JSON для перенаправляемого сообщения. Перекодирование также включает задачи сжатия и распаковки содержимого.
  • Преобразование — события, содержащие структурированные данные, могут требовать переформатирования этих данных для упрощения их использования последующими потребителями. Это может включать такие задачи, как, например, сведение вложенных структур, удаление лишних элементов данных или изменение формата полезной нагрузки согласно определенной схеме.
  • Пакетная обработка — события могут поступать из источника пакетами (несколько событий в одной передаче), но должны перенаправляться в целевой объект по одному (или наоборот). Таким образом, задача может пересылать несколько событий из одного входящего пакета или объединять набор событий, которые затем передаются вместе.
  • Проверка — данные событий из внешних источников часто должны быть проверка для того, соответствуют ли они набору правил, прежде чем они могут быть переадресованы. Правила могут выражаться с помощью схем или кода. События, которые не соответствуют условиям, могут удаляться (с регистрацией этого события в журналах) или перенаправляться в специальный целевой объект для дальнейшей обработки.
  • Обогащение — для событий, поступающих из некоторых источников, может потребоваться обогащение в другом контексте, прежде чем их можно будет использовать в целевых системах. Это может включать поиск базовых данных и их внедрение в событие либо добавление сведений об источнике, известном задаче репликации, но не указанном в событии.
  • Фильтрация — некоторые события, поступающие из источника, могут не перенаправляться в целевой объект на основе какого-либо правила. Фильтр проверяет событие по правилу и удаляет событие, если событие не соответствует правилу. Фильтрация повторяющихся событий посредством отслеживания определенных условий и удаления последующих событий с теми же значениями является формой фильтрации.
  • Шифрование — задача репликации может расшифровывать содержимое, поступающее из источника, и (или) зашифровывать содержимое, перенаправляемое в целевой объект, и (или) проверять целостность содержимого и метаданных на соответствие сигнатуре, переданной в событии, а также добавлять такую сигнатуру.
  • Аттестация — задача репликации может прикреплять к событию метаданные (возможно, защищенные цифровой сигнатурой), которые подтверждают, что это событие было получено через определенный канал или в определенное время.
  • Связывание — задача репликации может применять к потокам событий сигнатуры, чтобы защитить целостность такого потока, а также для возможности обнаружения отсутствующих событий.

Шаблоны преобразования, пакетной обработки и обогащения обычно удобнее всего реализовывать с помощью заданий Azure Stream Analytics.

Все такие шаблоны можно реализовать с помощью Функций Azure, используя триггер концентратора событий для получения сообщений и выходную привязку концентратора событий для их доставки.

Маршрутизация

Шаблон маршрутизации основан на схеме репликации, но вместо одного источника и одного целевого объекта задача репликации имеет несколько целевых объектов, как показано здесь в коде на C#:

[FunctionName("EH2EH")]
public static async Task Run(
    [EventHubTrigger("source", Connection = "EventHubConnectionAppSetting")] EventData[] events,
    [EventHub("dest1", Connection = "EventHubConnectionAppSetting")] EventHubClient output1,
    [EventHub("dest2", Connection = "EventHubConnectionAppSetting")] EventHubClient output2,
    ILogger log)
{
    foreach (EventData eventData in events)
    {
        // send to output1 and/or output2 based on criteria
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output1, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2==0 ) ? inputEvent : null;
        });
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output2, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2!=0 ) ? inputEvent : null;
        });
    }
}

Функция маршрутизации будет учитывать метаданные и (или) полезные данные сообщения, а затем выбирать одно из доступных мест назначения для отправки.

В Azure Stream Analytics можно добиться того же результата, определив несколько выводов, а затем выполнив запрос на вывод.

select * into dest1Output from inputSource where Info = 1
select * into dest2Output from inputSource where Info = 2

Проекция журнала

Шаблон проекции журнала выполняет сведение потока событий в индексированную базу данных, при этом события становятся записями в базе данных. Как правило, события добавляются в ту же коллекцию или таблицу, а ключ секции Концентратора событий становится частью первичного ключа, который ищет уникальность записи.

Проекция журнала может создавать хронологию временных рядов ваших данных событий или сжатое представление, в котором для каждого ключа раздела сохраняется только последнее событие. Форма целевой базы данных в конечном итоге зависит от ваших требований и требований вашего приложения. Этот шаблон также называется "источник событий".

Совет

Вы можете легко создавать проекции журналов в База данных SQL Azure и Azure Cosmos DB в Azure Stream Analytics, и вы должны выбрать этот вариант.

Следующие функции Azure проектируют содержимое концентратора событий, сжатого в коллекцию Azure Cosmos DB.

[FunctionName("Eh1ToCosmosDb1Json")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static async Task Eh1ToCosmosDb1Json(
    [EventHubTrigger("eh1", ConsumerGroup = "Eh1ToCosmosDb1", Connection = "Eh1ToCosmosDb1-source-connection")] EventData[] input,
    [CosmosDB(databaseName: "SampleDb", collectionName: "foo", ConnectionStringSetting = "CosmosDBConnection")] IAsyncCollector<object> output,
    ILogger log)
{
    foreach (var ev in input)
    {
        if (!string.IsNullOrEmpty(ev.SystemProperties.PartitionKey))
        {
            var record = new
            {
                id = ev.SystemProperties.PartitionKey,
                data = JsonDocument.Parse(ev.Body),
                properties = ev.Properties
            };
            await output.AddAsync(record);
        }
    }
}

Следующие шаги