Рубрики
Без рубрики

Потоковая передача обновлений в реальном времени из хранилища данных reactive Spring

В этом посте подробно описывается наивная реализация потоковой передачи обновлений из базы данных в любые другие компоненты… Помеченный kotlin, java, spring, r2dbc.

В этом посте подробно описывается наивная реализация потоковой передачи обновлений из базы данных любым другим компонентам, которые заинтересованы в этих данных. Точнее, как изменить репозиторий Spring Data R2DBC для отправки событий соответствующим подписчикам.

Немного общих знаний о R2DBC и Spring будет полезно для этого поста. Мои предыдущие работы, Асинхронный доступ к СУБД с помощью Spring Data R2DBC и Spring Data R2 ODBC для Microsoft SQL Server должны помочь в этом отношении.

Как я уже упоминал, это будет наивная реализация. Следовательно, в коде не будет ничего необычного.

Чтобы сделать это, я взломал Более простой репозиторий 2d bc , чтобы создать реализацию репозитория, которая выдает событие каждый раз, когда сохраняется новая запись. Новые события добавляются в Direct Processor и отправляются любому Издателю , подписанному на него. Это выглядит как:

class PersonRepository(
  entity: RelationalEntityInformation,
  databaseClient: DatabaseClient,
  converter: R2dbcConverter,
  accessStrategy: ReactiveDataAccessStrategy
) : SimpleR2dbcRepository(entity, databaseClient, converter, accessStrategy) {

  private val source: DirectProcessor = DirectProcessor.create()
  val events: Flux = source

  override fun  save(objectToSave: S): Mono {
    return super.save(objectToSave).doOnNext(source::onNext)
  }
}

Единственная функция из Simpler 2d bc Repository , которую необходимо переопределить, – это save ( ( сохранить все делегаты в сохранить ). doOnNext добавляется к исходному вызову сохранения, который отправляет новое событие в источник (( DirectorProcessor ) путем вызова На следующем

Источник преобразуется в Поток чтобы запретить классам из-за пределов репозитория добавлять новые события. Технически они все еще могут добавлять события, но им нужно будет разыгрывать их самостоятельно.

Как вы могли заметить, репозиторий принимает множество параметров и передает их в SimpleR 2d bc Repository . Экземпляр репозитория необходимо создавать вручную, так как некоторые из его зависимостей не могут быть введены автоматически:

@Configuration
class RepositoryConfiguration {

  @Bean
  fun personRepository(
    databaseClient: DatabaseClient,
    dataAccessStrategy: ReactiveDataAccessStrategy
  ): PersonRepository {
    val entity: RelationalPersistentEntity = dataAccessStrategy
      .converter
      .mappingContext
      .getRequiredPersistentEntity(Person::class.java) as RelationalPersistentEntity
    val relationEntityInformation: MappingRelationalEntityInformation =
      MappingRelationalEntityInformation(entity, Int::class.java)
    return PersonRepository(
      relationEntityInformation,
      databaseClient,
      dataAccessStrategy.converter,
      dataAccessStrategy
    )
  }
}

На этом этапе все настроено и готово к использованию. Ниже приведен пример того, как это работает:

personRepository.events
  .doOnComplete { log.info("Events flux has closed") }
  .subscribe { log.info("From events stream - $it") }
// insert people records over time
MARVEL_CHARACTERS
  .toFlux()
  .delayElements(Duration.of(1, SECONDS))
  .concatMap { personRepository.save(it) }
  .subscribe()

Который выводит:

29-08-2019 09:08:27.674 [reactor-tcp-nio-1] From events stream - Person(id=481, name=Spiderman, age=18)
29-08-2019 09:08:28.550 [reactor-tcp-nio-2] From events stream - Person(id=482, name=Ironman, age=48)
29-08-2019 09:08:29.555 [reactor-tcp-nio-3] From events stream - Person(id=483, name=Thor, age=1000)
29-08-2019 09:08:30.561 [reactor-tcp-nio-4] From events stream - Person(id=484, name=Hulk, age=49)
29-08-2019 09:08:31.568 [reactor-tcp-nio-5] From events stream - Person(id=485, name=Antman, age=49)
29-08-2019 09:08:32.571 [reactor-tcp-nio-6] From events stream - Person(id=486, name=Blackwidow, age=34)
29-08-2019 09:08:33.576 [reactor-tcp-nio-7] From events stream - Person(id=487, name=Starlord, age=38)
29-08-2019 09:08:34.581 [reactor-tcp-nio-8] From events stream - Person(id=488, name=Captain America, age=100)
29-08-2019 09:08:35.585 [reactor-tcp-nio-9] From events stream - Person(id=489, name=Warmachine, age=50)
29-08-2019 09:08:36.589 [reactor-tcp-nio-10] From events stream - Person(id=490, name=Wasp, age=26)
29-08-2019 09:08:37.596 [reactor-tcp-nio-11] From events stream - Person(id=491, name=Winter Soldier, age=101)
29-08-2019 09:08:38.597 [reactor-tcp-nio-12] From events stream - Person(id=492, name=Black Panther, age=42)
29-08-2019 09:08:39.604 [reactor-tcp-nio-1] From events stream - Person(id=493, name=Doctor Strange, age=42)
29-08-2019 09:08:40.609 [reactor-tcp-nio-2] From events stream - Person(id=494, name=Gamora, age=29)
29-08-2019 09:08:41.611 [reactor-tcp-nio-3] From events stream - Person(id=495, name=Groot, age=4)
29-08-2019 09:08:42.618 [reactor-tcp-nio-4] From events stream - Person(id=496, name=Hawkeye, age=47)
29-08-2019 09:08:43.620 [reactor-tcp-nio-5] From events stream - Person(id=497, name=Pepper Potts, age=44)
29-08-2019 09:08:44.627 [reactor-tcp-nio-6] From events stream - Person(id=498, name=Captain Marvel, age=59)
29-08-2019 09:08:45.631 [reactor-tcp-nio-7] From events stream - Person(id=499, name=Rocket Raccoon, age=30)
29-08-2019 09:08:46.637 [reactor-tcp-nio-8] From events stream - Person(id=500, name=Drax, age=49)
29-08-2019 09:08:47.639 [reactor-tcp-nio-9] From events stream - Person(id=501, name=Nebula, age=30)

Каждую секунду сохраняется запись, которая соответствует событиям, выходящим из хранилища.

Вот и все, что для этого нужно, по крайней мере, для этой базовой реализации. Я уверен, что можно было бы сделать гораздо больше, но сначала мне нужно было бы выяснить, как это сделать… Подводя итог, можно сказать, что с некоторыми дополнениями вы можете передавать данные, вставленные в вашу базу данных, компонентам, которые заинтересованы в добавляемых записях.

Если вам понравился этот пост или вы сочли его полезным (или и то, и другое), пожалуйста, не стесняйтесь подписываться на меня в Twitter по адресу @LankyDanDev и не забудьте поделиться с кем-либо еще, кто может счесть это полезным!

Оригинал: “https://dev.to/lankydandev/streaming-live-updates-from-a-reactive-spring-data-repository-8f”