Рубрики
Без рубрики

Потоки Redis в действии – Часть 3 (Java-приложение для обработки твитов с помощью потоков Redis)

Добро пожаловать в эту серию постов в блоге, в которых рассказывается о потоках Redis с помощью практического примера…. С тегами redis, база данных, программирование, java.

Потоки Redis в действии (Серия из 4 частей)

Добро пожаловать в эту серию постов в блоге, которые охватывают Потоки Redis с помощью практического примера. Мы будем использовать образец приложения, чтобы сделать данные Twitter доступными для поиска и запросов в режиме реального времени. Исследования и Потоки Redis служат основой этого решения, состоящего из нескольких взаимодействующих компонентов, каждый из которых мы рассмотрим в специальном сообщении в блоге.

Код доступен в этом репозитории GitHub – https://github.com/abhirockzz/redis-streams-in-action

В этом сообщении в блоге будет рассказано о приложении для обработки твитов на основе Java, роль которого заключается в получении твитов из потоков Redis и их сохранении (в виде ХЭША ), чтобы их можно было запрашивать с помощью Исследование ((точный термин для этого – “индексирование документов” в Исследовании ). Вы развернете приложение в Azure, проверите его, запустите несколько Исследование запросы для поиска твитов. Наконец, есть раздел, в котором мы пройдемся по коду, чтобы понять, “как все работает”.

Пред- реквизиты

Пожалуйста, убедитесь, что вы прочитали часть 2 этой серии и у вас запущено потребительское приложение Tweets. Это приложение будет считывать твиты из API потоковой передачи Twitter и отправлять их в потоки Redis. Затем наше приложение для обработки твитов (описанное в этом блоге) вступит во владение.

Вам понадобится учетная запись Azure, которую вы можете получить бесплатно и Интерфейс командной строки Azure . Как и предыдущее приложение, это приложение также будет развернуто в Экземплярах контейнера Azure с использованием обычных команд командной строки Docker. Эта возможность включена интеграцией между Docker и Azure . Просто убедитесь, что у вас установлена версия Docker Desktop 2.3.0.5 или более поздняя для Windows , mac OS , или установите Интерфейс командной строки интеграции Docker ACI для Linux .

Развертывание приложения в экземплярах контейнеров Azure

Если вы следили за предыдущим сообщением в блоге, вам следует настроить корпоративный уровень кэша Azure для Redis с помощью этого быстрого запуска . После завершения этого шага убедитесь, что вы сохранили следующую информацию: имя хоста Redis и ключ доступа

Приложение доступно в виде контейнера Docker – самый простой способ – просто использовать его повторно. Если вы хотите создать свой собственный образ, пожалуйста, используйте Dockerfile доступный в репозитории GitHub .

Если вы решите создать свой собственный образ, обязательно сначала создайте файл JAR с помощью Maven ( mvn clean install )

Его действительно удобно развертывать в Экземплярах контейнеров Azure , что позволяет запускать контейнеры Docker по требованию в управляемой бессерверной среде Azure.

Убедитесь, что вы создали контекст Azure , чтобы связать Docker с подпиской и группой ресурсов Azure, чтобы вы могли создавать экземпляры контейнеров и управлять ими.

docker login azure
docker context create aci aci-context
docker context use aci-context

Установите переменные среды – обязательно обновите хост Redis и учетные данные в соответствии с вашей учетной записью:

export STREAM_NAME=tweets_stream # don't change
export STREAM_CONSUMER_GROUP_NAME=redisearch_app_group # don't change

export REDIS_HOST=
export REDIS_PORT=
export REDIS_PASSWORD=
export SSL=true

.. а затем используйте docker run для развертывания контейнера в Azure:

docker run -d --name redis-streams-consumer \
-e STREAM_NAME=$STREAM_NAME \
-e STREAM_CONSUMER_GROUP_NAME=$STREAM_CONSUMER_GROUP_NAME \
-e REDIS_HOST=$REDIS_HOST \
-e REDIS_PORT=$REDIS_PORT \
-e REDIS_PASSWORD=$REDIS_PASSWORD \
-e SSL=$SSL \
abhirockzz/tweets-redis-streams-consumer-java

По мере создания контейнера вы должны увидеть вывод, подобный этому:

[+] Running 2/2
 ⠿ Group redis-streams-consumer  Created                                                                             5.2s
 ⠿ redis-streams-consumer        Created                                                                            10.5s

Проверьте это с помощью портала Azure:

Для проверки журналов контейнеров вы можете использовать обычные журналы docker команда:

docker logs redis-streams-consumer

Вы должны увидеть результат, похожий на этот:

Reading from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089239324282880
Reading from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089243539517441
Reading from stream tweets_stream with XREADGROUP
not processed - tweet:1393089247721132033
Reading from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089256105693184
Reading from stream tweets_stream with XREADGROUP
saved tweet to hash tweet:1393089260304179200
....

Заметили не обработанные журналы? Мы обсудим их в следующем разделе

Как только приложение будет запущено и запущено, оно начнет потреблять tweets_stream Повторите потоковую передачу и сохраните информацию о каждом твите в отдельном ХЭШЕ , который, в свою очередь, будет проиндексирован Исследование . Прежде чем двигаться дальше, войдите в экземпляр Redis с помощью redis-cli :

redis-cli -h  -p 10000 -a  --tls

Как обстоят дела?

Если вы внимательно посмотрите журналы, вы сможете найти имя ХЭША (которое основано на идентификаторе твита) например твит:<идентификатор твита> . Просто проверьте его содержимое с помощью HGETALL :

redis-cli> TYPE tweet:1393089163856056320
redis-cli> hash
redis-cli> HGETALL tweet:1393089163856056320

Результат будет выглядеть как любой другой ХЭШ . Например, для

 1) "location"
 2) "Nairobi, Kenya"
 3) "text"
 4) "RT @WanjaNjubi: #EidMubarak \xf0\x9f\x99\x8f\nMay peace be upon you now and always.\n#EidUlFitr https://t.co/MlL0DbM2aS"
 5) "id"
 6) "1393089163856056320"
 7) "user"
 8) "Hot_96Kenya"
 9) "hashtags"
10) "EidMubarak,EidUlFitr"

Хорошо, пришло время запросить твиты с помощью Исследования ! Давайте используем несколько команд для поиска в твиты-индекс индекс:

  • FT.ПОИСК твитов -индекс привет – вернет твиты который
  • FT.ПОИСК твитов - индекс привет|мир – такой же, как и выше, только он применим для “привет” ИЛИ “мир”
  • Используйте FT.ПОИСК твитов-индекс "@местоположение:Индия" если вас интересуют твиты из определенного местоположения
  • FT.ПОИСК твитов-индекс "@пользователь:джо* @местоположение: Индия" – это объединяет местоположение вместе с критерием, по которому имя пользователя должно начинаться с jo
  • FT.ПОИСК твитов-индекс "@пользователь:джо*| @местоположение:Индия" – это тонкий вариант вышесказанного. | обозначает ИЛИ критерии
  • Вы также можете выполнять поиск по хэштегам – FT.ПОИСК твитов-индекс "@хэштеги:{cov*}
  • Включите несколько хэш-тегов как таковых – FT.ПОИСК твитов - индекс "@хэштеги:{cov*|Med*}"

Это всего лишь несколько примеров. Я бы настоятельно рекомендовал вам обратиться к Исследовательской документации и попробовать другие другие запросы.

Давайте расширим масштабы

Одним из ключевых преимуществ использования потоков Redis является использование функции Групп потребителей. Это означает, что вы можете просто добавить больше экземпляров в приложение (горизонтальное масштабирование), чтобы улучшить обработку – чем больше экземпляров, тем быстрее обрабатываются твиты. Каждое приложение будет потреблять из другой части одного и того же потока Redis ( tweets_stream ), таким образом, рабочая нагрузка распределяется (почти) равномерно между всеми экземплярами – это дает вам возможность линейно масштабироваться.

Давайте попробуем это сделать. Чтобы запустить другой экземпляр, используйте docker run – обязательно используйте другое имя:

docker run -d --name redis-streams-consumer_2 \
-e STREAM_NAME=$STREAM_NAME \
-e STREAM_CONSUMER_GROUP_NAME=$STREAM_CONSUMER_GROUP_NAME \
-e REDIS_HOST=$REDIS_HOST \
-e REDIS_PORT=$REDIS_PORT \
-e REDIS_PASSWORD=$REDIS_PASSWORD \
-e SSL=$SSL \
abhirockzz/tweets-redis-streams-consumer-java

Обратите внимание, что я использовал другое имя --имя redis-streams-consumer_2

Все будет продолжаться, как и раньше – только немного быстрее, так как у нас есть еще одна рука помощи. Вы также можете проверить журналы нового экземпляра – журналы докеров redis-streams-consumer_2 .

Вы можете продолжать экспериментировать дальше и попробовать расширить масштаб до большего количества экземпляров.

Давайте копнем немного глубже

Мы можем анализировать потоки Redis с помощью команды РАСШИРЕНИЕ :

XPENDING tweets_stream redisearch_app_group

Вы получите результат, подобный этому:

1) (integer) 25
2) "1618572598902-0"
3) "1618573768902-0"
4) 1) 1) "consumer-b6410cf9-8244-41ba-b0a5-d79b66d33d65"
      2) "20"
   2) 1) "consumer-e5a872d4-b488-416e-92ee-55d2902b338f"
      2) "5"

Если вы новичок в потоках Redis, этот вывод может не иметь большого смысла. Вызов РАСШИРЕНИЕ возвращает количество сообщений, которые были получены нашим приложением для обработки, но еще не были обработаны (и подтверждены ). В этом случае у нас есть два экземпляра приложения (они случайным образом генерируют идентификаторы UUID) и имеют 20 и 5 необработанные сообщения соответственно (конечно, в вашем случае цифры будут отличаться).

В производственном сценарии сбои приложений могут возникать по нескольким причинам. Однако в нашем примере приложения приведенный ниже фрагмент кода использовался для моделирования этой ситуации – он случайным образом выбирает (вероятность около 20%), чтобы не обрабатывать твит, полученный из потоков Redis:

if (!(random.nextInt(5) == 0)) {
    conn.hset(hashName, entry.getFields());
    conn.xack(streamName, consumerGroupName, entry.getID());
}

Вот почему вы увидите ОЖИДАНИЕ количество медленно, но верно увеличивается. В рабочей среде, если один (или несколько) экземпляров выходят из строя, количество РАСШИРЯЮЩИХСЯ для этих экземпляров (ов) перестанет увеличиваться, но останется постоянным . Это означает, что эти сообщения теперь остаются необработанными – в данном конкретном примере это означает, что информация о твите не будет доступна в Исследование для вас на запрос.

Редис спешит на помощь

Потоки Redis обеспечивают надежную передачу сообщений. Он хранит состояние для каждого потребителя – это именно то, что вы видите при РАСШИРЕНИИ ! Если вы запустите другой экземпляр пользователя с той же группой и именем пользователя, вы сможете воспроизвести те же сообщения и повторно обработать их, чтобы убедиться, что твиты хранятся в Redis. Это не предполагает выполнения чего-либо другого/дополнительного с вашей стороны.

Другой вариант – иметь специальное приложение, которое может периодически проверять группа потребителей заявляет ( РАСШИРЯЕТСЯ ), утверждает сообщения, которые были оставлены без внимания, обрабатывает и (самое главное) подтверждает ( XACK ) их. В следующей (заключительной) части этой серии мы рассмотрим, как вы можете создать приложение, чтобы сделать именно это!

Итак, как все это работает?

Сейчас самое подходящее время по-быстрому ознакомиться с кодом.

Вы можете обратиться к коду в репозитории GitHub

Приложение использует Jedi search , который абстрагирует API модуля Research . Первое, что мы делаем, это устанавливаем соединение с Redis:

GenericObjectPoolConfig jedisPoolConfig = new GenericObjectPoolConfig<>();
JedisPool pool = new JedisPool(jedisPoolConfig, redisHost, Integer.valueOf(redisPort), timeout, redisPassword, isSSL);
Client redisearch = new Client(INDEX_NAME, pool);

Затем мы создаем Схему и Определение индекса .

        Schema sc = new Schema().addTextField(SCHEMA_FIELD_ID, 1.0).addTextField(SCHEMA_FIELD_USER, 1.0)
                .addTextField(SCHEMA_FIELD_TWEET, 1.0).addTextField(SCHEMA_FIELD_LOCATION, 1.0)
                .addTagField(SCHEMA_FIELD_HASHTAGS);

        IndexDefinition def = new IndexDefinition().setPrefixes(new String[] { INDEX_PREFIX });

        try {
            boolean indexCreated = redisearch.createIndex(sc, Client.IndexOptions.defaultOptions().setDefinition(def));

            if (indexCreated) {
                System.out.println("Created RediSearch index ");
            }
        } catch (Exception e) {
            System.out.println("Did not create RediSearch index - " + e.getMessage());
        }

Чтобы изучить API-интерфейсы потоков Redis ( группа создает , , группа чтения

Прежде чем двигаться дальше, мы создаем группу потребителей Redis Streams (используя x group Create ) – это обязательно. Группа потребителей представляет собой набор приложений, которые работают “вместе” и взаимодействуют друг с другом для совместного использования нагрузки на обработку:

try {
    conn = pool.getResource();
    String res = conn.xgroupCreate(streamName, consumerGroupName, StreamEntryID.LAST_ENTRY, true);
}

Каждое приложение в группе потребителей должно быть уникально идентифицировано. Хотя имя можно назначить вручную, мы генерируем случайное имя потребителя.

String consumerName = "consumer-" + UUID.randomUUID().toString();

Основная часть потребительского приложения – это цикл, который использует x группу чтения для чтения из потока Redis. Обратите внимание на Идентификатор записи потока.UNRECEIVED_ENTRY – это означает, что мы попросим Redis вернуть записи потока, которые не были получены любым другим потребителем в группе. Кроме того, наш вызов блокируется на 15 секунд и мы решили получить максимум 50 сообщений на вызов в ГРУППУ ЧТЕНИЯ (конечно, вы можете изменить это в соответствии с требованиями).

while (true) {

    List>> results = conn.xreadGroup(consumerGroupName, consumerName, 50,
                        15000, false, Map.entry(streamName, StreamEntryID.UNRECEIVED_ENTRY));

    if (results == null) {
        continue;
    }
    ....
}

Каждая запись потока должна быть сохранена в Redis ХЭШ (с использованием set ). Хорошо то, что чтение записи потока возвращает Хэш-карту и это именно то, что SET API также ожидает этого. Таким образом, мы можем повторно использовать Хэш-карту !

Это еще не все, обратите внимание на метод exact – это способ вызвать XACK и сообщить, что мы действительно успешно обработали сообщение:

                for (Entry> result : results) {
                    List entries = result.getValue();
                    for (StreamEntry entry : entries) {
                        String tweetid = entry.getFields().get("id");
                        String hashName = INDEX_PREFIX + tweetid;

                        try {
                            // simulate random failure/anomaly. ~ 20% will NOT be ACKed
                            if (!(random.nextInt(5) == 0)) {
                                conn.hset(hashName, entry.getFields());
                                conn.xack(streamName, consumerGroupName, entry.getID());
                            }
                        } catch (Exception e) {
                            continue;
                        }
                    }
                }

Здесь есть много возможностей для оптимизации. Например, вы можете сделать этот процесс многопоточным, создав поток для каждого пакета (скажем, 50 сообщений).

Вот и все для этого блога!

Вас интересует заключительная часть?

До сих пор мы рассмотрели обзор высокого уровня в части 1, приложение tweets consumer Rust в части 2 и приложение Java для обработки этих твитов из потоков Redis. Как и было обещано, в заключительной части серии будет рассказано о приложении для мониторинга процесса и повторной обработки оставленных сообщений, чтобы обеспечить надежность нашей системы в целом – это будет приложение Без сервера Go , развернутое в Функции Azure . Оставайтесь с нами!

Потоки Redis в действии (Серия из 4 частей)

Оригинал: “https://dev.to/azure/redis-streams-in-action-part-3-java-app-to-process-tweets-with-redis-streams-3n7n”