Потоки Redis в действии (Серия из 4 частей)
Добро пожаловать в эту серию постов в блоге, которые охватывают Потоки Redis с помощью практического примера. Мы будем использовать образец приложения, чтобы сделать данные Twitter доступными для поиска и запросов в режиме реального времени. Исследования и Потоки Redis служат основой этого решения, состоящего из нескольких взаимодействующих компонентов, каждый из которых мы рассмотрим в специальном сообщении в блоге.
Код доступен в этом репозитории GitHub – https://github.com/abhirockzz/redis-streams-in-action
В этом сообщении в блоге будет рассказано о приложении для обработки твитов на основе Java, роль которого заключается в получении твитов из потоков Redis и их сохранении (в виде ХЭША
), чтобы их можно было запрашивать с помощью Исследование
((точный термин для этого – “индексирование документов” в Исследовании
). Вы развернете приложение в Azure, проверите его, запустите несколько Исследование
запросы для поиска твитов. Наконец, есть раздел, в котором мы пройдемся по коду, чтобы понять, “как все работает”.
Пред- реквизиты
Пожалуйста, убедитесь, что вы прочитали часть 2 этой серии и у вас запущено потребительское приложение Tweets. Это приложение будет считывать твиты из API потоковой передачи Twitter и отправлять их в потоки Redis. Затем наше приложение для обработки твитов (описанное в этом блоге) вступит во владение.
Вам понадобится учетная запись Azure, которую вы можете получить бесплатно и Интерфейс командной строки Azure . Как и предыдущее приложение, это приложение также будет развернуто в Экземплярах контейнера Azure с использованием обычных команд командной строки Docker. Эта возможность включена интеграцией между Docker и Azure . Просто убедитесь, что у вас установлена версия Docker Desktop 2.3.0.5 или более поздняя для Windows , mac OS , или установите Интерфейс командной строки интеграции Docker ACI для Linux .
Развертывание приложения в экземплярах контейнеров Azure
Если вы следили за предыдущим сообщением в блоге, вам следует настроить корпоративный уровень кэша Azure для Redis с помощью этого быстрого запуска . После завершения этого шага убедитесь, что вы сохранили следующую информацию: имя хоста Redis и ключ доступа
Приложение доступно в виде контейнера Docker – самый простой способ – просто использовать его повторно. Если вы хотите создать свой собственный образ, пожалуйста, используйте Dockerfile
доступный в репозитории GitHub .
Если вы решите создать свой собственный образ, обязательно сначала создайте файл JAR с помощью Maven ( mvn clean install
)
Его действительно удобно развертывать в Экземплярах контейнеров Azure , что позволяет запускать контейнеры Docker по требованию в управляемой бессерверной среде Azure.
Убедитесь, что вы создали контекст Azure , чтобы связать Docker с подпиской и группой ресурсов Azure, чтобы вы могли создавать экземпляры контейнеров и управлять ими.
docker login azure docker context create aci aci-context docker context use aci-context
Установите переменные среды – обязательно обновите хост Redis и учетные данные в соответствии с вашей учетной записью:
export STREAM_NAME=tweets_stream # don't change export STREAM_CONSUMER_GROUP_NAME=redisearch_app_group # don't change export REDIS_HOST=export REDIS_PORT= export REDIS_PASSWORD= export SSL=true
.. а затем используйте docker run
для развертывания контейнера в Azure:
docker run -d --name redis-streams-consumer \ -e STREAM_NAME=$STREAM_NAME \ -e STREAM_CONSUMER_GROUP_NAME=$STREAM_CONSUMER_GROUP_NAME \ -e REDIS_HOST=$REDIS_HOST \ -e REDIS_PORT=$REDIS_PORT \ -e REDIS_PASSWORD=$REDIS_PASSWORD \ -e SSL=$SSL \ abhirockzz/tweets-redis-streams-consumer-java
По мере создания контейнера вы должны увидеть вывод, подобный этому:
[+] Running 2/2 ⠿ Group redis-streams-consumer Created 5.2s ⠿ redis-streams-consumer Created 10.5s
Проверьте это с помощью портала Azure:
Для проверки журналов контейнеров вы можете использовать обычные журналы docker
команда:
docker logs redis-streams-consumer
Вы должны увидеть результат, похожий на этот:
Reading from stream tweets_stream with XREADGROUP saved tweet to hash tweet:1393089239324282880 Reading from stream tweets_stream with XREADGROUP saved tweet to hash tweet:1393089243539517441 Reading from stream tweets_stream with XREADGROUP not processed - tweet:1393089247721132033 Reading from stream tweets_stream with XREADGROUP saved tweet to hash tweet:1393089256105693184 Reading from stream tweets_stream with XREADGROUP saved tweet to hash tweet:1393089260304179200 ....
Заметили не обработанные
журналы? Мы обсудим их в следующем разделе
Как только приложение будет запущено и запущено, оно начнет потреблять tweets_stream
Повторите потоковую передачу и сохраните информацию о каждом твите в отдельном ХЭШЕ
, который, в свою очередь, будет проиндексирован Исследование
. Прежде чем двигаться дальше, войдите в экземпляр Redis с помощью redis-cli :
redis-cli -h-p 10000 -a --tls
Как обстоят дела?
Если вы внимательно посмотрите журналы, вы сможете найти имя ХЭША
(которое основано на идентификаторе твита) например твит:<идентификатор твита>
. Просто проверьте его содержимое с помощью HGETALL
:
redis-cli> TYPE tweet:1393089163856056320 redis-cli> hash redis-cli> HGETALL tweet:1393089163856056320
Результат будет выглядеть как любой другой ХЭШ
. Например, для
1) "location" 2) "Nairobi, Kenya" 3) "text" 4) "RT @WanjaNjubi: #EidMubarak \xf0\x9f\x99\x8f\nMay peace be upon you now and always.\n#EidUlFitr https://t.co/MlL0DbM2aS" 5) "id" 6) "1393089163856056320" 7) "user" 8) "Hot_96Kenya" 9) "hashtags" 10) "EidMubarak,EidUlFitr"
Хорошо, пришло время запросить твиты с помощью Исследования
! Давайте используем несколько команд для поиска в твиты-индекс
индекс:
FT.ПОИСК твитов -индекс привет
– вернет твиты которыйFT.ПОИСК твитов - индекс привет|мир
– такой же, как и выше, только он применим для “привет” ИЛИ “мир”- Используйте
FT.ПОИСК твитов-индекс "@местоположение:Индия"
если вас интересуют твиты из определенногоместоположения
FT.ПОИСК твитов-индекс "@пользователь:джо* @местоположение: Индия"
– это объединяетместоположение
вместе с критерием, по которому имя пользователя должно начинаться сjo
FT.ПОИСК твитов-индекс "@пользователь:джо*| @местоположение:Индия"
– это тонкий вариант вышесказанного.| обозначает
ИЛИ
критерии- Вы также можете выполнять поиск по хэштегам –
FT.ПОИСК твитов-индекс "@хэштеги:{cov*}
- Включите несколько хэш-тегов как таковых –
FT.ПОИСК твитов - индекс "@хэштеги:{cov*|Med*}"
Это всего лишь несколько примеров. Я бы настоятельно рекомендовал вам обратиться к Исследовательской документации и попробовать другие другие запросы.
Давайте расширим масштабы
Одним из ключевых преимуществ использования потоков Redis является использование функции Групп потребителей. Это означает, что вы можете просто добавить больше экземпляров в приложение (горизонтальное масштабирование), чтобы улучшить обработку – чем больше экземпляров, тем быстрее обрабатываются твиты. Каждое приложение будет потреблять из другой части одного и того же потока Redis ( tweets_stream
), таким образом, рабочая нагрузка распределяется (почти) равномерно между всеми экземплярами – это дает вам возможность линейно масштабироваться.
Давайте попробуем это сделать. Чтобы запустить другой экземпляр, используйте docker run
– обязательно используйте другое имя:
docker run -d --name redis-streams-consumer_2 \ -e STREAM_NAME=$STREAM_NAME \ -e STREAM_CONSUMER_GROUP_NAME=$STREAM_CONSUMER_GROUP_NAME \ -e REDIS_HOST=$REDIS_HOST \ -e REDIS_PORT=$REDIS_PORT \ -e REDIS_PASSWORD=$REDIS_PASSWORD \ -e SSL=$SSL \ abhirockzz/tweets-redis-streams-consumer-java
Обратите внимание, что я использовал другое имя --имя redis-streams-consumer_2
Все будет продолжаться, как и раньше – только немного быстрее, так как у нас есть еще одна рука помощи. Вы также можете проверить журналы нового экземпляра – журналы докеров redis-streams-consumer_2
.
Вы можете продолжать экспериментировать дальше и попробовать расширить масштаб до большего количества экземпляров.
Давайте копнем немного глубже
Мы можем анализировать потоки Redis с помощью команды РАСШИРЕНИЕ :
XPENDING tweets_stream redisearch_app_group
Вы получите результат, подобный этому:
1) (integer) 25 2) "1618572598902-0" 3) "1618573768902-0" 4) 1) 1) "consumer-b6410cf9-8244-41ba-b0a5-d79b66d33d65" 2) "20" 2) 1) "consumer-e5a872d4-b488-416e-92ee-55d2902b338f" 2) "5"
Если вы новичок в потоках Redis, этот вывод может не иметь большого смысла. Вызов РАСШИРЕНИЕ
возвращает количество сообщений, которые были получены нашим приложением для обработки, но еще не были обработаны (и подтверждены ). В этом случае у нас есть два экземпляра приложения (они случайным образом генерируют идентификаторы UUID) и имеют 20
и 5
необработанные сообщения соответственно (конечно, в вашем случае цифры будут отличаться).
В производственном сценарии сбои приложений могут возникать по нескольким причинам. Однако в нашем примере приложения приведенный ниже фрагмент кода использовался для моделирования этой ситуации – он случайным образом выбирает (вероятность около 20%), чтобы не обрабатывать твит, полученный из потоков Redis:
if (!(random.nextInt(5) == 0)) { conn.hset(hashName, entry.getFields()); conn.xack(streamName, consumerGroupName, entry.getID()); }
Вот почему вы увидите ОЖИДАНИЕ
количество медленно, но верно увеличивается. В рабочей среде, если один (или несколько) экземпляров выходят из строя, количество РАСШИРЯЮЩИХСЯ
для этих экземпляров (ов) перестанет увеличиваться, но останется постоянным . Это означает, что эти сообщения теперь остаются необработанными – в данном конкретном примере это означает, что информация о твите не будет доступна в Исследование
для вас на запрос.
Редис спешит на помощь
Потоки Redis обеспечивают надежную передачу сообщений. Он хранит состояние для каждого потребителя – это именно то, что вы видите при РАСШИРЕНИИ
! Если вы запустите другой экземпляр пользователя с той же группой и именем пользователя, вы сможете воспроизвести те же сообщения и повторно обработать их, чтобы убедиться, что твиты хранятся в Redis. Это не предполагает выполнения чего-либо другого/дополнительного с вашей стороны.
Другой вариант – иметь специальное приложение, которое может периодически проверять группа потребителей заявляет ( РАСШИРЯЕТСЯ
), утверждает сообщения, которые были оставлены без внимания, обрабатывает и (самое главное) подтверждает ( XACK
) их. В следующей (заключительной) части этой серии мы рассмотрим, как вы можете создать приложение, чтобы сделать именно это!
Итак, как все это работает?
Сейчас самое подходящее время по-быстрому ознакомиться с кодом.
Вы можете обратиться к коду в репозитории GitHub
Приложение использует Jedi search , который абстрагирует API модуля Research
. Первое, что мы делаем, это устанавливаем соединение с Redis:
GenericObjectPoolConfigjedisPoolConfig = new GenericObjectPoolConfig<>(); JedisPool pool = new JedisPool(jedisPoolConfig, redisHost, Integer.valueOf(redisPort), timeout, redisPassword, isSSL); Client redisearch = new Client(INDEX_NAME, pool);
Затем мы создаем Схему и Определение индекса .
Schema sc = new Schema().addTextField(SCHEMA_FIELD_ID, 1.0).addTextField(SCHEMA_FIELD_USER, 1.0) .addTextField(SCHEMA_FIELD_TWEET, 1.0).addTextField(SCHEMA_FIELD_LOCATION, 1.0) .addTagField(SCHEMA_FIELD_HASHTAGS); IndexDefinition def = new IndexDefinition().setPrefixes(new String[] { INDEX_PREFIX }); try { boolean indexCreated = redisearch.createIndex(sc, Client.IndexOptions.defaultOptions().setDefinition(def)); if (indexCreated) { System.out.println("Created RediSearch index "); } } catch (Exception e) { System.out.println("Did not create RediSearch index - " + e.getMessage()); }
Чтобы изучить API-интерфейсы потоков Redis ( группа создает
, , группа чтения
Прежде чем двигаться дальше, мы создаем группу потребителей Redis Streams (используя x group Create
) – это обязательно. Группа потребителей представляет собой набор приложений, которые работают “вместе” и взаимодействуют друг с другом для совместного использования нагрузки на обработку:
try { conn = pool.getResource(); String res = conn.xgroupCreate(streamName, consumerGroupName, StreamEntryID.LAST_ENTRY, true); }
Каждое приложение в группе потребителей должно быть уникально идентифицировано. Хотя имя можно назначить вручную, мы генерируем случайное имя потребителя.
String consumerName = "consumer-" + UUID.randomUUID().toString();
Основная часть потребительского приложения – это цикл, который использует x группу чтения
для чтения из потока Redis. Обратите внимание на Идентификатор записи потока.UNRECEIVED_ENTRY
– это означает, что мы попросим Redis вернуть записи потока, которые не были получены любым другим потребителем в группе. Кроме того, наш вызов блокируется на 15 секунд
и мы решили получить максимум 50
сообщений на вызов в ГРУППУ ЧТЕНИЯ
(конечно, вы можете изменить это в соответствии с требованиями).
while (true) { List>> results = conn.xreadGroup(consumerGroupName, consumerName, 50, 15000, false, Map.entry(streamName, StreamEntryID.UNRECEIVED_ENTRY)); if (results == null) { continue; } .... }
Каждая запись потока должна быть сохранена в Redis ХЭШ
(с использованием set
). Хорошо то, что чтение записи потока возвращает Хэш-карту
и это именно то, что SET
API также ожидает этого. Таким образом, мы можем повторно использовать Хэш-карту
!
Это еще не все, обратите внимание на метод exact
– это способ вызвать XACK
и сообщить, что мы действительно успешно обработали сообщение:
for (Entry> result : results) { List entries = result.getValue(); for (StreamEntry entry : entries) { String tweetid = entry.getFields().get("id"); String hashName = INDEX_PREFIX + tweetid; try { // simulate random failure/anomaly. ~ 20% will NOT be ACKed if (!(random.nextInt(5) == 0)) { conn.hset(hashName, entry.getFields()); conn.xack(streamName, consumerGroupName, entry.getID()); } } catch (Exception e) { continue; } } }
Здесь есть много возможностей для оптимизации. Например, вы можете сделать этот процесс многопоточным, создав поток для каждого пакета (скажем, 50 сообщений).
Вот и все для этого блога!
Вас интересует заключительная часть?
До сих пор мы рассмотрели обзор высокого уровня в части 1, приложение tweets consumer Rust в части 2 и приложение Java для обработки этих твитов из потоков Redis. Как и было обещано, в заключительной части серии будет рассказано о приложении для мониторинга процесса и повторной обработки оставленных сообщений, чтобы обеспечить надежность нашей системы в целом – это будет приложение Без сервера Go , развернутое в Функции Azure . Оставайтесь с нами!
Потоки Redis в действии (Серия из 4 частей)
Оригинал: “https://dev.to/azure/redis-streams-in-action-part-3-java-app-to-process-tweets-with-redis-streams-3n7n”