Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этом разделе представлен общий обзор реализации потока. В нем описываются понятия и сведения, которые не отображаются на уровне приложения. Если вы планируете использовать только потоки, вам не нужно читать этот раздел.
Терминология:
Мы используем слово "queue", чтобы обозначить любую технологию долговременного хранилища, которая способна принимать события потока и позволяет либо извлекать события, либо использовать механизмы, основанные на push-уведомлениях, для их обработки. Как правило, для обеспечения масштабируемости эти технологии предоставляют сегментированные или секционированные очереди. Например, очереди Azure позволяют создавать несколько очередей, а центры событий имеют несколько концентраторов.
Постоянные потоки
Все Orleans поставщики постоянных потоков совместно используют общую реализацию PersistentStreamProvider. Эти универсальные поставщики потоков должны быть настроены с учетом IQueueAdapterFactoryтехнологии.
Например, для тестирования у нас есть адаптеры очередей, которые создают тестовые данные, а не считывают данные из очереди. В приведенном ниже коде показано, как настроить поставщик постоянного потока для использования пользовательского адаптера очереди (генератора). Это делается путем настройки поставщика постоянных потоков с помощью фабричной функции, используемой для создания адаптера.
hostBuilder.AddPersistentStreams(
StreamProviderName, GeneratorAdapterFactory.Create);
Когда производитель потоков создает новый элемент потока и выполняет вызов stream.OnNext(), среда выполнения потоковой передачи Orleans вызывает соответствующий метод у указанного IQueueAdapter поставщика потоков, который размещает элемент непосредственно в соответствующую очередь.
Привлечение агентов
В основе поставщика постоянного потока находятся агенты извлечения. Агенты извлечения извлекают события из набора устойчивых очередей и доставляют их в код приложения, реализованный в зернах, которые их потребляют. Можно считать агентов, осуществляющих вытягивание, распределенными «микрослужбами» — это секционированный, высокодоступный и эластичный распределенный компонент. Агенты извлечения выполняются внутри одного и того же силоса, где размещаются зерна приложений и полностью управляются Orleans средой выполнения потоковой передачи.
StreamQueueMapper и StreamQueueBalancer.
Агенты извлечения параметризуются с помощью IStreamQueueMapper и IStreamQueueBalancer. Компонент IStreamQueueMapper предоставляет список всех очередей и также отвечает за сопоставление потоков с очередями. Таким образом, производитель поставщика постоянного потока знает, в какую очередь следует поместить сообщение.
IStreamQueueBalancer демонстрирует способ балансировки очередей между Orleans силосами и агентами. Цель состоит в том, чтобы назначать очереди агентам в сбалансированном порядке, чтобы предотвратить узкие места и поддерживать эластичность. При добавлении нового силоса в кластер Orleans, очереди автоматически перебалансируются между старыми и новыми силосами.
StreamQueueBalancer позволяет настроить этот процесс.
Orleans имеет несколько встроенных модулей StreamQueueBalancers для поддержки различных сценариев балансировки (большое и небольшое количество очередей) и различных сред (Azure, on-prem, static).
Используя пример генератора тестов, приведенный выше, в приведенном ниже коде показано, как настроить средство сопоставления очередей и балансировщик очередей.
hostBuilder
.AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
providerConfigurator =>
providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
ob => ob.Configure(options => options.TotalQueueCount = 8))
.UseDynamicClusterConfigDeploymentBalancer());
Приведенный выше код настраивает GeneratorAdapterFactory, чтобы использовать средство сопоставления с восемью очередями и балансирует распределение этих очередей по кластерам с помощью DynamicClusterConfigDeploymentBalancer.
Протокол извлечения данных
Каждый silo запускает набор агентов извлечения данных, и каждый агент извлекает из одной очереди. Сами агенты извлечения реализуются внутренним компонентом среды выполнения, который называется SystemTarget. SystemTarget по сути представляют собой зерна выполнения, подвержены однопоточной обработке, могут использовать стандартную передачу сообщений зерен и так же легки, как и зерна. В отличие от зерен, SystemTargets не являются виртуальными: они явно создаются средой выполнения и не обладают прозрачностью местоположения. Реализуя агенты извлечения как SystemTargets, Orleans среда выполнения потоковой передачи может полагаться на встроенные Orleans функции и может масштабироваться до очень большого количества очередей, так как создание нового агента извлечения является как дешевым, как создание нового зерна.
Каждый агент извлечения запускает периодический таймер, который извлекает из очереди, вызывая метод IQueueAdapterReceiver.GetQueueMessagesAsync. Возвращенные сообщения помещаются во внутреннюю структуру IQueueCacheданных для каждого агента. Каждое сообщение проверяется, чтобы узнать его целевой поток. Агент использует Pub-Sub, чтобы узнать список подписчиков этого потока. После получения списка потребителей агент сохраняет его локально (в его кэше pub-sub), поэтому не нужно обращаться к Pub-Sub по каждому сообщению. Агент также подписывается на pub-sub, чтобы получать уведомления о новых потребителях контента, которые подписываются на этот поток. Это рукопожатие между агентом и pub-sub гарантирует сильные семантики подписки на потоковую передачу: после того как потребитель подписался на поток, он увидит все события, созданные после его подписки. Кроме того, использование StreamSequenceToken позволяет подписываться в прошлом.
Кэш очередей
IQueueCache — это внутренняя структура данных для каждого агента, которая позволяет отделять удаление новых событий из очереди и их доставку к потребителям. Кроме того, это позволяет разделить доставку для разных потоков и различных потребителей.
Представьте себе ситуацию, когда один поток имеет 3 потребителей потока, и один из них медленный. Если не проявить осторожность, этот медленный потребитель может повлиять на ход выполнения агента, замедляя потребление другими потребителями этого потока, и даже замедляя исключение и доставку событий для других потоков. Чтобы предотвратить это и разрешить максимальный параллелизм в агенте, мы используем IQueueCache.
IQueueCache буферизирует события потоковой передачи и предоставляет агенту способ доставки событий каждому потребителю в собственном темпе. Поставка для каждого потребителя осуществляется внутренним компонентом IQueueCacheCursor, который отслеживает прогресс каждого потребителя. Таким образом, каждый потребитель получает события по своему собственному темпу: быстрые потребители получают события так быстро, как они удаляются из очереди, в то время как медленные потребители получают их позже. После доставки сообщения всем потребителям его можно удалить из кэша.
Обратная прессура
Обратное давление в Orleans среде выполнения потоковой передачи применяется в двух местах: перемещение событий потоковой передачи из очереди в агент и доставка событий от агента к потребителям потоковой передачи.
Последний предоставляется встроенным Orleans механизмом доставки сообщений. Каждое событие потока доставляется от агента потребителям через стандартный Orleans грейн-месседжинг поочерёдно. То есть агенты отправляют одно событие (или пакет событий ограниченного размера) каждому потребителю потока и ожидают подтверждения. Следующее событие не начнет доставляться до тех пор, пока задача для предыдущего события не будет решена или нарушена. Таким образом, мы естественно ограничиваем частоту доставки по одному сообщению за раз.
При переносе событий потоковой передачи из очереди к агенту Orleans потоковая передача обеспечивает новый специальный механизм обратного давления. Поскольку агент отделяет процесс извлечения событий из очереди от их доставки потребителям, один медленный потребитель может настолько отстать, что IQueueCache будет заполнена. Чтобы предотвратить IQueueCache рост на неопределенный срок, мы ограничиваем его размер (ограничение размера настраивается). Однако агент никогда не бросает незавершенные события.
Вместо этого, когда кэш начинает заполняться, агенты замедляют скорость извлечения событий из очереди. Таким образом, мы можем переждать медленные периоды доставки, изменив скорость, с которой мы потребляем из очереди ("обратное давление") и вернуться к быстрым скоростям потребления позже. Для обнаружения "медленной доставки" долины IQueueCache использует внутреннюю структуру данных контейнеров кэша, которая отслеживает ход доставки событий отдельным потребителям потоков. Это приводит к очень быстрой и самостоятельной настройке системы.