Потоки

Завершено

Распределенным микрослужбам часто нужно оставлять данные или сообщения для потребления другими микрослужбами. В достаточно большом решении одни и те же сообщения могут потребляться множеством разных микрослужб. Такую схему работы часто называют архитектурой конкурирующих потребителей.

В нашем сценарии розничной компании микрослужбы должны оставлять сообщения асинхронно для последующего потребления множеством клиентов. В идеале эти клиенты будут потреблять такие данные по расписанию, не заставляя производителя сообщений ждать. Сейчас эти микрослужбы рассчитаны на отправку данных и ожидание ответа или подтверждения, после которого смогут продолжить обработку. С увеличением масштаба решения розничной компании все более важное значение приобретают проектирование и реализации решения, в котором производители и потребители сообщений смогут асинхронно хранить и анализировать сообщения, не дожидаясь внешних стимулов.

Diagram of microservices manually communicating with each other.

Схема, демонстрирующая три пары служб, взаимодействующих друг с другом и ожидающие подтверждения в рамках полезных данных воспроизведения.

Брокеринг обмена сообщений с помощью потоков

Брокер обмена сообщениями — это ПО промежуточного слоя, которое управляет распределением данных сообщений между несколькими потребителями, которые обычно конкурируют за одни и те же данные.

Здесь вы узнаете, как Потоки функция Кэш Azure для Redis может служить по промежуточному слоям, чтобы упростить хранение сообщений от нескольких производителей и распространение сообщений для многих клиентов. Функция "Потоки" может помочь компонентам вашего приложения управлять обменом сообщениями в масштабируемом, отказоустойчивом, распределенном режиме.

Функция "Потоки" кэша Azure для Redis сохраняет сообщения в поток, который могут потреблять клиенты. Микрослужбы могут использовать эту функцию для распределения сообщений между несколькими клиентами, о которых на момент производства сообщений она может не знать. Кроме того, число потребителей и поставщиков сообщений можно наращивать отдельно по схемам, имеющим смысл для облачного решения.

Diagram of Azure Cache for Redis as a message broker.

Схема, демонстрирующая реализацию брокера обмена сообщениями, расположенную между производителями и потребителями сообщений и реализованную с помощью кэша Azure для Redis.

Эта функция упрощает брокерирование сообщений между микрослужбами.

Добавление записей в поток

Клиенты могут создавать сообщения для потока с помощью набора пар "поле-значение". Клиент может указать уникальный идентификатор при создании сообщения или его автоматического создания Redis. В терминологии Redis сообщение называется записью, а уникальный идентификатор — ключом.

Примечание.

Автоматический ключ составляется из времени Unix локального компьютера в миллисекундах и порядкового номера, если за одну миллисекунду создаются сразу несколько записей. Например, если текущая дата и время на локальном устройстве — полночь (UTC) 1 декабря 2010 года, а запись — третья за текущую миллисекунду, будет создан автоматический ключ 1291161600000-3.

Для добавления новых записей в поток в Redis есть команда XADD. Перед выполнением команды поток не требуется создавать. Например, используйте XADD команду, чтобы добавить новую запись в поток media.photo.genthumb с помощью автогенерированного ключа и следующих данных пары "значение поля":

Поле Value
расположение sd7f9sd7.png
width 300
height 300
XADD media.photos.genthumb * location sd7f9sd7.png width 300 height 300

Также для использования с командой XADD можно указать конкретный ключ. В этом примере команда XADD используется для добавления новой записи в поток media.photos.genthumb с ключом 1596514316945-2 и следующими данными пары "поле-значение":

Поле Value
расположение xczv897.png
filter grayscale
XADD media.photos.genthumb 1596514316945-2 location xczv897.png filter grayscale

Запрос записей в потоке

Когда в потоке появятся записи, клиенты смогут вручную считать количество записей или извлекать их напрямую.

Для получения всех или подмножества записей в потоке в Redis есть команда XRANGE, а для подсчета количества записей в определенном потоке — команда XLEN.

Запрос диапазонов записей

Команда XRANGE используется с тремя основными параметрами:

  • название потока для запроса;
  • начальная точка результирующего набора;
  • конечная точка результирующего набора.

Diagram of a query for a subset of data in a stream.

Схема, демонстрирующая запрос, возвращающий подмножество записей за период между двумя разными значениями времени.

Команда XRANGE поддерживает использование специальных операторов, если вы не знаете или не хотите указывать начало или конец диапазона. Оператор - позволяет указать, что диапазон должен начинаться с хронологического начала потока. Оператор + позволяет указать, что диапазон должен продолжать до конца потока (также хронологического).

В первом примере команда XRANGE может использоваться с двумя ключами для получения всех записей между потоками в хронологическом порядке. В этом примере кода команда XRANGE используется для получения всех записей в потоке media.photos.genthumb, начинается с ключа 1596514316945-2 и заканчивая ключом 1609476184275-0.

XRANGE media.photos.genthumb 1596514316945-2 1609476184275-0

Во втором примере операторы + и - можно использовать вместе для получения всех записей из потока. В этом примере выполняется запрос одного потока, но извлекаются все записи с помощью специальных операторов.

XRANGE media.photos.genthumb - +

Специальные операторы можно также использовать отдельно. В этом примере оператор - используется для получения всех записей из потока вплоть до записи 1596514316945-2.

XRANGE media.photos.genthumb - 1596514316945-2

Этот пример иллюстрирует получение всех записей, начиная с записи 1596514316945-2, до конца потока с помощью оператора +.

XRANGE media.photos.genthumb 1596514316945-2 +

Команду XREVRANGE можно также использовать с тем же синтаксисом, что и команду XRANGE и получать все записи в обратном порядке.

XREVRANGE media.photos.genthumb - +
XREVRANGE media.photos.genthumb - 1596514316945-2
XREVRANGE media.photos.genthumb 1596514316945-2 +

Подсчет количества записей в потоке

Простая команда XLEN принимает имя потока в качестве аргумента и возвращает целое число, соответствующее количеству записей в потоке.

XLEN media.photos.genthumb

Чтение данных из потока

Данные можно запрашивать из Redis в виде временных рядов, но предпочтительнее прослушивать новые элементы из потока. У потока может быть много прослушивателей, ожидающих хранящихся в потоке данных. Если после запуска данных создается новый прослушиватель, он может начать "с начала" и обработать данные с начала потока.

Данные потребляются из потока с помощью команды XREAD. Эта команда включает набор параметров, которые не рассматриваются в этом модуле. Теперь сосредоточьтесь на XREAD способности команды читать один или несколько потоков, начиная с определенных точек.

Чтение данных из определенной начальной точки в потоке

Команда XREAD , в самой простой и наиболее часто используемой форме, может подписаться на записи из потока, начиная с первой записи. Затем потребитель обрабатывает записи в порядке, прежде чем достичь текущего конца потока. На этом этапе потребитель может перейти к ожиданию новых данных в потоке.

Чтобы подписаться на данные в потоке, используйте команду XREAD с аргументом STREAMS и списком потоков для чтения вместе с соответствующими начальными точками.

В этом примере команда XREAD используется для чтения данных из потока media.photos.genthumb с самого начала за счет ключа 0. В большинстве случаев ключ 0 фактически меньше текущего ключа на основе времени.

XREAD STREAMS media.photos.genthumb 0

Если потребитель потока должен приостановить потребление сообщений по какой-либо причине, можно передать в команду ключ XREAD, чтобы начать потреблять сообщения из определенной точки. В этом примере сообщения потребляются, начиная с ключа 1639714145947-2.

XREAD STREAMS media.photos.genthumb 1639714145947-2

Совет

Это удобно при каком-либо сбое или ошибке получателя потока. Команда XREAD может возобновить чтение с момента последней удачной операции считывания.

Другой вариант — использовать оператор $. Он указывает, что чтение потока нужно начать с конца. Фактически этот пример получает только новые записи вместо исторических записей.

XREAD STREAMS media.photos.genthumb $

Чтение данных из нескольких потоков

Во всех предыдущих примера данные считывались из одного потока. Возможно, вы помните, что аргумент STREAMS поддерживает перечисление потоков вместе с соответствующими начальными точками.

В этом примере команда XREAD считывает как media.photo.genthumb , так и потоки media.photo.delete одновременно при запуске обоих потоков из их хронологически первых записей.

XREAD STREAMS media.photos.genthumb media.photos.delete 0 0

Удаление потока и его данных

Когда данные используются из потока, он остается в потоке, пока он не будет удален вручную. Это позволяет нескольким клиентам потреблять данные из одного и того же потока, не упуская никакие данные.

Чтобы удалить определенную запись из потока вручную, используйте команду XDEL, указав название потока и ключ записи для удаления. В этом примере команда XDEL используется для удаления записи с ключом 1596514316945-2 из потока media.photos.genthumb.

XDEL media.photos.genthumb 1596514316945-2

Удаление всех элементов из потока не удаляет поток, так как пустой поток является полностью юридической конструкцией в Redis.

Чтобы удалить поток полностью, используйте команду DEL с названием потока.

DEL media.photos.genthumb