В этом посте подробно описывается наивная реализация потоковой передачи обновлений из базы данных любым другим компонентам, которые заинтересованы в этих данных. Точнее, как изменить репозиторий Spring Data R2DBC для отправки событий соответствующим подписчикам.
Немного общих знаний о R2DBC и Spring будет полезно для этого поста. Мои предыдущие работы, Асинхронный доступ к СУБД с помощью Spring Data R2DBC и Spring Data R2 ODBC для Microsoft SQL Server должны помочь в этом отношении.
Как я уже упоминал, это будет наивная реализация. Следовательно, в коде не будет ничего необычного.
Чтобы сделать это, я взломал Более простой репозиторий 2d bc
, чтобы создать реализацию репозитория, которая выдает событие каждый раз, когда сохраняется новая запись. Новые события добавляются в Direct Processor
и отправляются любому Издателю
, подписанному на него. Это выглядит как:
class PersonRepository( entity: RelationalEntityInformation, databaseClient: DatabaseClient, converter: R2dbcConverter, accessStrategy: ReactiveDataAccessStrategy ) : SimpleR2dbcRepository (entity, databaseClient, converter, accessStrategy) { private val source: DirectProcessor = DirectProcessor.create () val events: Flux = source override fun save(objectToSave: S): Mono{ return super.save(objectToSave).doOnNext(source::onNext) } }
Единственная функция из Simpler 2d bc Repository
, которую необходимо переопределить, – это save
( (
сохранить все делегаты в
сохранить ).
doOnNext добавляется к исходному вызову сохранения, который отправляет новое событие в
источник ((
DirectorProcessor ) путем вызова
На следующем
Источник
преобразуется в Поток
чтобы запретить классам из-за пределов репозитория добавлять новые события. Технически они все еще могут добавлять события, но им нужно будет разыгрывать их самостоятельно.
Как вы могли заметить, репозиторий принимает множество параметров и передает их в SimpleR 2d bc Repository
. Экземпляр репозитория необходимо создавать вручную, так как некоторые из его зависимостей не могут быть введены автоматически:
@Configuration class RepositoryConfiguration { @Bean fun personRepository( databaseClient: DatabaseClient, dataAccessStrategy: ReactiveDataAccessStrategy ): PersonRepository { val entity: RelationalPersistentEntity= dataAccessStrategy .converter .mappingContext .getRequiredPersistentEntity(Person::class.java) as RelationalPersistentEntity val relationEntityInformation: MappingRelationalEntityInformation = MappingRelationalEntityInformation(entity, Int::class.java) return PersonRepository( relationEntityInformation, databaseClient, dataAccessStrategy.converter, dataAccessStrategy ) } }
На этом этапе все настроено и готово к использованию. Ниже приведен пример того, как это работает:
personRepository.events .doOnComplete { log.info("Events flux has closed") } .subscribe { log.info("From events stream - $it") } // insert people records over time MARVEL_CHARACTERS .toFlux() .delayElements(Duration.of(1, SECONDS)) .concatMap { personRepository.save(it) } .subscribe()
Который выводит:
29-08-2019 09:08:27.674 [reactor-tcp-nio-1] From events stream - Person(id=481, name=Spiderman, age=18) 29-08-2019 09:08:28.550 [reactor-tcp-nio-2] From events stream - Person(id=482, name=Ironman, age=48) 29-08-2019 09:08:29.555 [reactor-tcp-nio-3] From events stream - Person(id=483, name=Thor, age=1000) 29-08-2019 09:08:30.561 [reactor-tcp-nio-4] From events stream - Person(id=484, name=Hulk, age=49) 29-08-2019 09:08:31.568 [reactor-tcp-nio-5] From events stream - Person(id=485, name=Antman, age=49) 29-08-2019 09:08:32.571 [reactor-tcp-nio-6] From events stream - Person(id=486, name=Blackwidow, age=34) 29-08-2019 09:08:33.576 [reactor-tcp-nio-7] From events stream - Person(id=487, name=Starlord, age=38) 29-08-2019 09:08:34.581 [reactor-tcp-nio-8] From events stream - Person(id=488, name=Captain America, age=100) 29-08-2019 09:08:35.585 [reactor-tcp-nio-9] From events stream - Person(id=489, name=Warmachine, age=50) 29-08-2019 09:08:36.589 [reactor-tcp-nio-10] From events stream - Person(id=490, name=Wasp, age=26) 29-08-2019 09:08:37.596 [reactor-tcp-nio-11] From events stream - Person(id=491, name=Winter Soldier, age=101) 29-08-2019 09:08:38.597 [reactor-tcp-nio-12] From events stream - Person(id=492, name=Black Panther, age=42) 29-08-2019 09:08:39.604 [reactor-tcp-nio-1] From events stream - Person(id=493, name=Doctor Strange, age=42) 29-08-2019 09:08:40.609 [reactor-tcp-nio-2] From events stream - Person(id=494, name=Gamora, age=29) 29-08-2019 09:08:41.611 [reactor-tcp-nio-3] From events stream - Person(id=495, name=Groot, age=4) 29-08-2019 09:08:42.618 [reactor-tcp-nio-4] From events stream - Person(id=496, name=Hawkeye, age=47) 29-08-2019 09:08:43.620 [reactor-tcp-nio-5] From events stream - Person(id=497, name=Pepper Potts, age=44) 29-08-2019 09:08:44.627 [reactor-tcp-nio-6] From events stream - Person(id=498, name=Captain Marvel, age=59) 29-08-2019 09:08:45.631 [reactor-tcp-nio-7] From events stream - Person(id=499, name=Rocket Raccoon, age=30) 29-08-2019 09:08:46.637 [reactor-tcp-nio-8] From events stream - Person(id=500, name=Drax, age=49) 29-08-2019 09:08:47.639 [reactor-tcp-nio-9] From events stream - Person(id=501, name=Nebula, age=30)
Каждую секунду сохраняется запись, которая соответствует событиям, выходящим из хранилища.
Вот и все, что для этого нужно, по крайней мере, для этой базовой реализации. Я уверен, что можно было бы сделать гораздо больше, но сначала мне нужно было бы выяснить, как это сделать… Подводя итог, можно сказать, что с некоторыми дополнениями вы можете передавать данные, вставленные в вашу базу данных, компонентам, которые заинтересованы в добавляемых записях.
Если вам понравился этот пост или вы сочли его полезным (или и то, и другое), пожалуйста, не стесняйтесь подписываться на меня в Twitter по адресу @LankyDanDev и не забудьте поделиться с кем-либо еще, кто может счесть это полезным!
Оригинал: “https://dev.to/lankydandev/streaming-live-updates-from-a-reactive-spring-data-repository-8f”