Рубрики
Без рубрики

Создание масштабируемого сервиса чата в прямом эфире с помощью Spring Web Flux, Redis PubSub, сокета и Auth0

Введение В этом посте будет рассмотрено, как создать единую комнату чата в реальном времени, аналогичную… Помеченный java, микросервисы.

Вступление

В этом посте будет рассказано, как создать единую комнату чата в реальном времени, похожую на чат Twitch. Основное внимание будет уделено разработке сервиса, который может масштабироваться горизонтально для поддержания доступности и увеличения спроса. Этот сервис будет разработан с помощью Java Spring Boot.

Не стесняйтесь проверять репозиторий на GitHub .

Требования

Поскольку основной целью этого поста будет изучение масштабируемости чата в реальном времени, функциональность комнаты чата будет сведена к минимуму.

  • Пользователи смогут присоединиться к единой глобальной комнате чата. Там не будет комнат для выбора.
  • Для упрощения обслуживания сообщения будут только текстовыми.
  • Как и в случае с twitch, пользователи смогут видеть сообщения чата только с того момента, когда они присоединились к комнате. Истории чатов не будет.
  • Пользователи смогут просматривать сообщения анонимно, но смогут отправлять сообщения в чат только после того, как они зарегистрируются с уникальным именем пользователя.
  • Отправители сообщений будут идентифицироваться только по имени пользователя.

Архитектура

Для облегчения связи в режиме реального времени мы будем использовать протокол WebSocket для передачи сообщений между клиентами и сервисом. Для горизонтального масштабирования потребуется несколько экземпляров сервера, чтобы между ними можно было сбалансировать нагрузку на новые соединения WebSocket.

Благодаря сбалансированной нагрузке подключений WebSocket между экземплярами мы можем использовать посредника сообщений для рассылки сообщений всем клиентам. Для этого проекта мы будем обмениваться сообщениями с помощью Redis PubSub .

Redis PubSub

Redis PubSub позволит передавать своим подписчикам сообщения с высокой пропускной способностью с минимальной задержкой. Он также может быть распределен по нескольким экземплярам для увеличения пропускной способности и избыточности. Здесь есть отличный доклад Шахара Мора, в котором рассказывается более подробно.

Ограничения

Одним из недостатков Redis PubSub является то, что он только “запускает и забывает” подписчиков и не предлагает способа гарантировать доставку сообщений. Это может привести к удалению сообщений, если экземпляр сервера перезапущен и пользователям придется повторно подключиться.

Добавление гарантированной доставки сообщений выходит за рамки этого поста. Можно также привести аргумент о том, что нечастые сообщения для некоторых пользователей не повлияют на работу чата в прямом эфире.

Если вам требуется более высокая степень надежности сообщений, стоит проверить другие брокеры сообщений, такие как Кафка , RabbitMQ или даже Потоки Redis .

Тестирование

В более позднем посте мы будем разрабатывать клиент React, который будет подключаться к нашей службе чата, но до тех пор мы будем проверять соответствие требованиям, разрабатывая интеграционные тесты в рамках Spring framework.

Мы будем использовать библиотеку Playtika test containers-spring boot , чтобы легко развернуть контейнер Redis docker при локальном запуске интеграционных тестов. Затем его можно использовать в качестве нашего посредника сообщений PubSub, когда мы тестируем функциональность сервера соединений.

Все интеграционные тесты, используемые для соответствия требованиям, можно найти здесь .

Реализация

Предупреждение! Есть много областей, которые нужно охватить, так что это будет своего рода информационный свалка. Я всегда рад ответить на вопросы, чтобы прояснить любые детали.

Поток пружинного полотна

Мы будем использовать неблокирующую платформу Spring WebFlux для разработки сервера соединений. Это поможет уменьшить задержку при большом количестве подключений. WebFlux также позволит нам использовать реализацию реактивных потоков Project Reactor для написания асинхронного кода, о котором легче рассуждать.

Для получения информации о преимуществах реактивного программирования, не стесняйтесь посещать сайты Spring или Project Reactor .

Для использования Spring Web Flux требуется эта зависимость.

implementation 'org.springframework.boot:spring-boot-starter-webflux'

Возможно, ссылка на руководство по настройке и дополнительная информация о реактивном программировании

Служба PubSub

Мы будем абстрагировать функциональность PubSub в сервис с общим классом сообщений и интерфейсом.

@Data  
@AllArgsConstructor
@NoArgsConstructor
public class Message {
    String username;
    String message;
}
public interface PubSubService {
    Mono publish(Message message); 
    Flux subscribe(); 
}

Конфигурация Redis

Для взаимодействия с сервером Redis нам понадобится зависимость Spring Boot Red is.

implementation 'org.springframework.boot:spring-boot-starter-data-redis'

Redis также необходимо будет настроить таким образом, чтобы мы могли использовать его реактивный API. Мы добавим необходимые компоненты конфигурации в класс конфигурации Redis PubSub.

Компонент Reactive RedisTemplate необходим для публикации сообщений и настроен таким образом, чтобы он мог сериализовать объекты сообщений в JSON.

@Bean
public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {

        StringRedisSerializer keySerializer = new StringRedisSerializer();
    Jackson2JsonRedisSerializer valueSerializer = 
                        new Jackson2JsonRedisSerializer<>(Message.class);

    RedisSerializationContext.RedisSerializationContextBuilder builder =
            RedisSerializationContext.newSerializationContext(keySerializer);

    RedisSerializationContext context =
            builder.value(valueSerializer).build();

    return new ReactiveRedisTemplate<>(factory, context);
}

Активен красный – компонент MessageListenerContainer необходим для подписки на каналы сообщений.

@Bean
ReactiveRedisMessageListenerContainer container(ReactiveRedisConnectionFactory factory) {
    return new ReactiveRedisMessageListenerContainer(factory);
}

Служба Redis PubSub

Теперь мы можем создать службу Red is PubSub, внедрив интерфейс службы PubSub и внедрив настроенные компоненты Redis.

@Service
public class RedisPubSubService implements PubSubService {

    private final ReactiveRedisTemplate reactiveTemplate;
    private final ReactiveRedisMessageListenerContainer reactiveMsgListenerContainer;

    private final ChannelTopic channelTopic = new ChannelTopic("broadcast"); // channel used to send and recieve messages

    public RedisPubSubService(ReactiveRedisTemplate reactiveTemplate,
                              ReactiveRedisMessageListenerContainer reactiveMsgListenerContainer) {
        this.reactiveMsgListenerContainer = reactiveMsgListenerContainer;
        this.reactiveTemplate = reactiveTemplate;
    }

Метод интерфейса публикации использует шаблон Redis для сериализации и публикации сообщения в теме “трансляция”. Возвращается пустой Mono, сигнализирующий об успешной отправке сообщения.

@Override
public Mono publish(Message message) {
    return this.reactiveTemplate
            .convertAndSend(channelTopic.getTopic(), message)
            .then(Mono.empty());
}

Подписка на канал немного более подробна и требует, чтобы мы передавали канал как повторяющийся. Мы также должны вручную передать SerializationContext из шаблона Redis для десериализации входящих сообщений.

@Override
public Flux subscribe() {
    return reactiveMsgListenerContainer
            .receive(Collections.singletonList(channelTopic),
                    reactiveTemplate.getSerializationContext().getKeySerializationPair(),
                    reactiveTemplate.getSerializationContext().getValueSerializationPair())
            .map(ReactiveSubscription.Message::getMessage);
}

Теперь у нас есть функциональный сервис PubSub, который может передавать сообщения между экземплярами сервера соединений.

Модульные тесты для этой службы можно найти здесь .

Ракета

При подключении клиентов к службе чата мы будем использовать протокол сокета поверх подключения к веб-сети. Rocket позволяет нам использовать стандарт реактивных потоков на границе сети и обладает некоторыми полезными функциями, такими как противодавление и маршрутизация.

Обратное давление будет особенно полезно для нашего клиентского приложения, если у нас будет большой поток сообщений, которые мы можем оценить – ограничить количество сообщений, принимаемых клиентом.

Чтобы использовать ракету, нам нужно включить зависимость ракеты от пружинной загрузки:

implementation 'org.springframework.boot:spring-boot-starter-rsocket'

Нам также нужно будет настроить сокет для использования веб-сокетов и установить конечную точку для подключения клиентов.

spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rs

Далее мы можем реализовать PubSubController, который отвечает за обработку сообщений сокета от клиента. Этот контроллер вводит службу PubSub, которую мы создали ранее.

@Controller
@Slf4j
public class PubSubController {

        private final PubSubService messagingService;

    public PubSubController(PubSubService messagingService) {
        this.messagingService = messagingService;
    }

// .....

Rocket Spring позволяет использовать два полезных метода аннотаций:

  • @@Сопоставление содержимого – используется для обработки при установлении нового соединения.
  • @MessageMapping – используется для обработки сообщений для определенного маршрута.

Мы будем использовать @ConnectMapping для регистрации новых подключений или их закрытия.

@ConnectMapping
void onConnect(RSocketRequester requester) {
    Objects.requireNonNull(requester.rsocket(), "rsocket connection should not be null")
            .onClose()
            .doOnError(error -> log.warn(requester.rsocketClient() + " Closed"))
            .doFinally(consumer -> log.info(requester.rsocketClient() + " Disconnected"))
            .subscribe();
}

@@MessageMapping(“опубликовать”) будет пересылать отправленные сообщения в службу PubSub.

@MessageMapping("publish")
Mono publish(Message message) {
    return messagingService.publish(message);
}

@@MessageMapping(“подписаться”) будет транслировать сообщения из службы PubSub клиенту.

@MessageMapping("subscribe")
Flux subscribe() {
    return messagingService.subscribe();
}

Аутентификация Auth0

Мы будем использовать сторонний сервис Auth0 для регистрации пользователя и аутентификации. Токены доступа пользователей JWT , предоставляемые Auth0, также совместимы с сокетом. Это позволит нам обезопасить отдельные маршруты обмена сообщениями.

Отличный учебник по использованию Auth0 с Spring Boot можно найти здесь .

Уникальное имя пользователя, введенное при регистрации, будет закодировано в маркере доступа и декодировано при отправке сообщения. Это избавляет от необходимости сохранять имя пользователя в службе чата. Из-за такой кодировки токен доступа должен быть недолговечным, чтобы снизить риск компрометации.

Чтобы иметь возможность декодировать и проверять токены JWT в нашем приложении, нам понадобится эта зависимость.

implementation 'org.springframework.boot:spring-boot-starter-oauth2-resource-server'

Затем мы можем настроить конечную точку сервера аутентификации и добавить свойства для правильной аудитории auth0 и пользовательского утверждения, используемого для кодирования имени пользователя.

spring.security.oauth2.resourceserver.jwt.issuer-uri=https://{YOUR_AUTH0_DOMAIN}.us.auth0.com/
auth0.audience={YOUR_AUTH0_API_AUDIENCE}
auth0.username-claim={YOUR_AUTH0_USERNAME_CLAIM}

Крепежное гнездо

Для обеспечения безопасности сокета включите зависимость безопасности ракеты.

implementation 'org.springframework.security:spring-security-rsocket'

Чтобы иметь возможность передавать декодированный токен JWT в качестве принципа аутентификации нам понадобится зависимость от обмена сообщениями spring security.

implementation 'org.springframework.security:spring-security-messaging

Теперь мы можем создать класс конфигурации безопасности сокета, введя свойства auth0, добавленные ранее. Мы также можем добавить аннотации @enablersocketsecurity и @EnableReactiveMethodSecurtiy к активной поддержке безопасности.

@Configuration
@EnableRSocketSecurity
@EnableReactiveMethodSecurity
public class RSocketSecurityConfig {

    @Value("${auth0.audience}")
    String audience;
    @Value("${spring.security.oauth2.resourceserver.jwt.issuer-uri}")
    String issuer;
    @Value("${auth0.username-claim}")
    String usernameClaim;

//......

Для декодирования токена JWT нам нужно будет создать компонент ReactiveJwtDecoder . Обычай Также добавлены валидаторы токенов OAuth2 для проверки того, что утверждения аудитории и имени пользователя включены в токен.

@Bean
public ReactiveJwtDecoder reactiveJwtDecoder() {

    var reactiveJwtDecoder = (NimbusReactiveJwtDecoder) ReactiveJwtDecoders.fromOidcIssuerLocation(issuer);

    OAuth2TokenValidator audienceValidator = (jwt) -> {
        OAuth2Error error = new OAuth2Error("invalid_token", "The required audience is missing", null);
        if (jwt.getAudience().contains(audience)) {
            return OAuth2TokenValidatorResult.success();
        }
        return OAuth2TokenValidatorResult.failure(error);
    };

    OAuth2TokenValidator usernameValidator = (jwt) -> {
        OAuth2Error error = new OAuth2Error("invalid_token", "The required username is missing", null);
        if (jwt.getClaimAsString(usernameClaim) != null) {
            return OAuth2TokenValidatorResult.success();
        }
        return OAuth2TokenValidatorResult.failure(error);
    };

    OAuth2TokenValidator withIssuer = JwtValidators.createDefaultWithIssuer(issuer);
    OAuth2TokenValidator compositeValidator = new DelegatingOAuth2TokenValidator<>(withIssuer, audienceValidator, usernameValidator);

    reactiveJwtDecoder.setJwtValidator(compositeValidator);

    return reactiveJwtDecoder;
}

Чтобы предотвратить отправку сообщений неавторизованными пользователями, мы можем защитить маршрут “публикация”, настроив компонент PayloadSocketAcceptorInterceptor для запроса токенов JWT.

@Bean
public PayloadSocketAcceptorInterceptor rsocketInterceptor(RSocketSecurity rSocketSecurity) {
    return rSocketSecurity.authorizePayload(authorize ->
                    authorize.route("publish").authenticated() 
                            .anyExchange().permitAll()) // everything else is permitted
            .jwt(Customizer.withDefaults())
            .build();
}

Обработчик сообщений Rocket также настроен таким образом, чтобы разрешить принцип аутентификации в методах обработки сокетов.

@Bean
RSocketMessageHandler messageHandler(RSocketStrategies strategies) {
    RSocketMessageHandler mh = new RSocketMessageHandler();
    mh.getArgumentResolverConfigurer().addCustomResolver(
            new AuthenticationPrincipalArgumentResolver());
    mh.setRouteMatcher(new PathPatternRouteMatcher());
    mh.setRSocketStrategies(strategies);
    return mh;
}

Теперь мы можем получить доступ к расшифрованному утверждению имени пользователя при обработке сообщения.

@Value("${auth0.username-claim}")
String usernameClaim;

//......

@MessageMapping("publish")
Mono publish(String message, @AuthenticationPrincipal Mono token) {
    return token.map(jwt -> jwt.getClaimAsString(usernameClaim))
            .flatMap(username -> messagingService.publish(new Message(username, message)));
}

Вывод

На этом завершается обзор реализации. Опять же, я всегда рад ответить на любые вопросы об этом посте или обосновании выбранной технологии.

Не стесняйтесь проверять полный исходный код на GitHub .

Оригинал: “https://dev.to/olibroughton/building-a-scalable-live-stream-chat-service-with-spring-webflux-redis-pubsub-rsocket-and-auth0-22o9”