Рубрики
Без рубрики

R2DBC – Реактивное подключение к реляционной базе данных

Краткий и практический обзор подключения к базе данных, реагирующей на R2DBC.

Автор оригинала: Philippe Sevestre.

1. Обзор

В этом уроке мы покажем, как мы можем использовать R2DBC для выполнения операций с базой данных реактивным способом .

Чтобы изучить R2DBC, мы создадим простое тестовое приложение Spring Web Flux, которое реализует операции CRUD для одного объекта, используя только асинхронные операции для достижения этой цели.

2. Что такое R2DBC?

Реактивное развитие находится на подъеме, с каждым днем появляются новые рамки, а существующие становятся все более распространенными. Однако основной проблемой реактивной разработки является тот факт, что доступ к базе данных в мире Java/JVM остается в основном синхронным . Это прямое следствие того, как был разработан JDBC, и привело к некоторым уродливым взломам, чтобы адаптировать эти два принципиально разных подхода.

Для решения проблемы асинхронного доступа к базам данных в стране Java появились два стандарта. Первый из них, ADBC (API асинхронного доступа к базам данных), поддерживается Oracle, но на момент написания этой статьи, похоже, несколько застопорился, без четкой временной шкалы.

Второй, который мы рассмотрим здесь, – это R2DBC (Реактивное подключение к реляционным базам данных), усилия сообщества, возглавляемые командой из Pivotal и других компаний. Этот проект, который все еще находится в стадии бета-тестирования, показал большую жизнеспособность и уже предоставляет драйверы для баз данных Postgres, H2 и MSSQL.

3. Настройка проекта

Использование R2DBC в проекте требует добавления зависимостей к основному API и подходящему драйверу. В нашем примере мы будем использовать H2, так что это означает только две зависимости:


    io.r2dbc
    r2dbc-spi
    0.8.0.M7


    io.r2dbc
    r2dbc-h2
    0.8.0.M7

Maven Central пока еще не имеет артефактов R2DBC, поэтому нам также нужно добавить пару репозиториев Spring в наш проект:


    
        spring-milestones
        Spring Milestones
        https://repo.spring.io/milestone
        
            false
        
   
   
       spring-snapshots
       Spring Snapshots
       https://repo.spring.io/snapshot
       
           true
       
    

4. Заводская настройка Подключения

Первое , что нам нужно сделать, чтобы получить доступ к базе данных с помощью R2DBC, – это создать объект ConnectionFactory , который играет ту же роль, что и источник данных JDBC . Самый простой способ создать ConnectionFactory – это через Фабрики соединений класс.

Этот класс имеет статические методы, которые принимают Параметры фабрики соединений объект и возвращают ConnectionFactory. Поскольку нам понадобится только один экземпляр нашего ConnectionFactory , давайте создадим @Bean , который мы позже сможем использовать с помощью инъекции везде, где нам нужно:

@Bean
public ConnectionFactory connectionFactory(R2DBCConfigurationProperties properties) {
    ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(properties.getUrl());
    Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
    if (!StringUtil.isNullOrEmpty(properties.getUser())) {
        ob = ob.option(USER, properties.getUser());
    }
    if (!StringUtil.isNullOrEmpty(properties.getPassword())) {
        ob = ob.option(PASSWORD, properties.getPassword());
    }        
    return ConnectionFactories.get(ob.build());    
}

Здесь мы берем параметры, полученные из вспомогательного класса, украшенного аннотацией @ConfigurationProperties , и заполняем наш экземпляр ConnectionFactoryOptions . Чтобы заполнить его, RODBC реализует шаблон компоновщика с одним методом option , который принимает Option и значение.

R2 ODBC определяет ряд хорошо известных параметров, таких как ИМЯ ПОЛЬЗОВАТЕЛЯ и ПАРОЛЬ , которые мы использовали выше. Другой способ установить эти параметры-передать строку подключения в метод parse() класса Параметры фабрики соединений//.

Вот пример типичного URL-адреса подключения R2 ODBC:

r2dbc:h2:mem://./testdb

Давайте разберем эту строку на ее компоненты:

  • r2dbc : Идентификатор фиксированной схемы для URL-адресов R2DBC — другой допустимой схемой является rd2bcs , используемый для соединений, защищенных SSL
  • h2 : Идентификатор драйвера, используемый для поиска соответствующей фабрики соединений.
  • mem : Протокол,специфичный для драйвера — в нашем случае это соответствует базе данных в памяти
  • //./testdb : строка для конкретного драйвера, обычно содержащая хост, базу данных и любые дополнительные параметры.

Как только у нас есть готовый набор опций, мы передаем его в метод get() static factory, чтобы создать наш ConnectionFactory bean.

5. Выполнение инструкций

Как и в случае с JDBC, использование R2DBC в основном связано с отправкой инструкций SQL в базу данных и обработкой результирующих наборов. Однако, поскольку R2DBC является реактивным API, он сильно зависит от типов реактивных потоков, таких как Publisher и Subscriber .

Использование этих типов напрямую немного громоздко, поэтому мы будем использовать типы project reactor, такие как Mono и Flux , которые помогают нам писать более чистый и лаконичный код.

В следующих разделах мы рассмотрим, как реализовать задачи, связанные с базой данных, создав реактивный класс DAO для простого класса Account . Этот класс содержит всего три свойства и имеет соответствующую таблицу в нашей базе данных:

public class Account {
    private Long id;
    private String iban;
    private BigDecimal balance;
    // ... getters and setters omitted
}

5.1. Получение соединения

Прежде чем мы сможем отправить какие-либо инструкции в базу данных, нам нужно Соединение экземпляр . Мы уже видели , как создать ConnectionFactory , поэтому неудивительно, что мы будем использовать его для получения соединения . Что мы должны помнить, так это то , что теперь вместо обычного Соединения мы получаем Издателя одного соединения.

Наш ReactiveAccountDao, который является обычной пружиной @Component , получает свой ConnectionFactory через инъекцию конструктора, поэтому он легко доступен в методах обработчика.

Давайте взглянем на первые несколько строк метода findById () , чтобы узнать, как получить и начать использовать Соединение :

public Mono> findById(Long id) {         
    return Mono.from(connectionFactory.create())
      .flatMap(c ->
          // use the connection
      )
      // ... downstream processing omitted
}

Здесь мы адаптируем Publisher , возвращенный из нашего ConnectionFactory , в Mono , который является исходным источником для нашего потока событий.

5.1. Подготовка и подача заявлений

Теперь , когда у нас есть Соединение , давайте используем его для создания Оператора и привязки к нему параметра:

.flatMap( c -> 
    Mono.from(c.createStatement("select id,iban,balance from Account where id = $1")
      .bind("$1", id)
      .execute())
      .doFinally((st) -> close(c))
 )

Метод Connection ‘s createStatement принимает строку запроса SQL, которая может дополнительно содержать заполнители привязки, называемые “маркерами” в спецификации .

Несколько примечательных моментов здесь: во-первых, createStatement – это синхронная операция , которая позволяет нам использовать свободный стиль для привязки значений к возвращаемому оператору; во-вторых, и это очень важно, синтаксис заполнителя/маркера зависит от поставщика!

В этом примере мы используем специфический синтаксис H2, который использует $n для обозначения параметров. Другие поставщики могут использовать другой синтаксис, например :param , @Pn или какое-либо другое соглашение. Это важный аспект, на который мы должны обратить внимание при переносе устаревшего кода на этот новый API .

Сам процесс привязки довольно прост, благодаря шаблону fluent API и упрощенному набору текста: есть только один перегруженный bind() метод, который заботится обо всех преобразованиях ввода — конечно, в соответствии с правилами базы данных.

Первый параметр, передаваемый в bind () , может быть порядковым номером на основе нуля, который соответствует размещению маркера в операторе, или это может быть строка с фактическим маркером.

После того, как мы установили значения для всех параметров , мы вызываем execute () , который возвращает Publisher объектов Result , которые мы снова оборачиваем в Mono для дальнейшей обработки. Мы прикрепляем обработчик doFinally() к этому Mono , чтобы убедиться, что мы закроем наше соединение независимо от того, завершится ли обработка потока нормально или нет.

5.2. Результаты Обработки

Следующий шаг в нашем конвейере отвечает за обработку Результатов объектов и генерацию потока ResponseEntity< Account> экземпляров .

Поскольку мы знаем , что может быть только один экземпляр с заданным id , мы фактически вернем Mono поток. Фактическое преобразование происходит внутри функции, переданной методу map() полученного Результата :

.map(result -> result.map((row, meta) -> 
    new Account(row.get("id", Long.class),
      row.get("iban", String.class),
      row.get("balance", BigDecimal.class))))
.flatMap(p -> Mono.from(p));

Метод map() результата ожидает функцию, которая принимает два параметра. Первый-это объект Row , который мы используем для сбора значений для каждого столбца и заполнения экземпляра Account . Второй, metal , представляет собой объект метаданных строки, содержащий информацию о текущей строке, такую как имена и типы столбцов.

Предыдущий вызов map() в нашем конвейере разрешается в Mono> , но нам нужно вернуть Mono из этого метода. Чтобы исправить это, мы добавляем заключительный шаг flatMap () , который адаптирует Producer в Mono.

5.3. Пакетные инструкции

R2DBC также поддерживает создание и выполнение пакетов операторов, которые позволяют выполнять несколько операторов SQL в одном вызове execute () . В отличие от обычных операторов, пакетные операторы не поддерживают привязку и в основном используются по соображениям производительности в сценариях, таких как задания ETL.

Наш пример проекта использует пакет инструкций для создания таблицы Account и вставки в нее некоторых тестовых данных:

@Bean
public CommandLineRunner initDatabase(ConnectionFactory cf) {
    return (args) ->
      Flux.from(cf.create())
        .flatMap(c -> 
            Flux.from(c.createBatch()
              .add("drop table if exists Account")
              .add("create table Account(" +
                "id IDENTITY(1,1)," +
                "iban varchar(80) not null," +
                "balance DECIMAL(18,2) not null)")
              .add("insert into Account(iban,balance)" +
                "values('BR430120980198201982',100.00)")
              .add("insert into Account(iban,balance)" +
                "values('BR430120998729871000',250.00)")
              .execute())
            .doFinally((st) -> c.close())
          )
        .log()
        .blockLast();
}

Здесь мы используем Batch , возвращенный из create Batch () , и добавляем несколько операторов SQL. Затем мы отправляем эти инструкции на выполнение, используя тот же метод execute () , доступный в интерфейсе Statement .

В данном конкретном случае нас не интересуют никакие результаты — только то, что все операторы выполняются нормально. Если бы нам понадобились какие-либо полученные результаты, все, что нам нужно было сделать, это добавить шаг вниз по течению в этот поток для обработки испускаемых Результатов объектов.

6. Сделки

Последняя тема, которую мы рассмотрим в этом уроке, – транзакции. Как и следовало ожидать, мы управляем транзакциями, как в JDBC, то есть с помощью методов, доступных в объекте Connection .

Как и прежде, основное различие заключается в том , что теперь все методы, связанные с транзакциями, являются асинхронными , возвращающими Publisher , который мы должны добавить в наш поток в соответствующих точках.

Наш пример проекта использует транзакцию при реализации метода create Account() :

public Mono createAccount(Account account) {    
    return Mono.from(connectionFactory.create())
      .flatMap(c -> Mono.from(c.beginTransaction())
        .then(Mono.from(c.createStatement("insert into Account(iban,balance) values($1,$2)")
          .bind("$1", account.getIban())
          .bind("$2", account.getBalance())
          .returnGeneratedValues("id")
          .execute()))
        .map(result -> result.map((row, meta) -> 
            new Account(row.get("id", Long.class),
              account.getIban(),
              account.getBalance())))
        .flatMap(pub -> Mono.from(pub))
        .delayUntil(r -> c.commitTransaction())
        .doFinally((st) -> c.close()));   
}

Здесь мы добавили вызовы, связанные с транзакциями, в двух пунктах. Во-первых, сразу после получения нового соединения из базы данных мы вызываем метод beginTransaction() . Как только мы узнаем, что транзакция была успешно запущена, мы подготовим и выполним инструкцию insert .

На этот раз мы также использовали метод return Generated Values () , чтобы указать базе данных возвращать значение идентификатора, сгенерированное для этой новой учетной записи . R2DBC возвращает эти значения в Result , содержащем одну строку со всеми сгенерированными значениями, которые мы используем для создания экземпляра Account|/.

Еще раз, нам нужно адаптировать входящий Mono> в Mono , поэтому мы добавляем flatMap () , чтобы решить эту проблему . Далее мы фиксируем транзакцию в задержке до() шага. Нам это нужно, потому что мы хотим убедиться, что возвращенная Учетная запись уже была зафиксирована в базе данных.

Наконец, мы прикрепляем do Finally шаг к этому конвейеру, который закрывает Соединение , когда все события из возвращенного Mono будут израсходованы.

7. Пример использования DAO

Теперь, когда у нас есть реактивный DAO, давайте используем его для создания простого Spring Web Flux приложения, чтобы продемонстрировать, как его использовать в типичном приложении. Поскольку эта структура уже поддерживает реактивные конструкции, это становится тривиальной задачей. Например, давайте рассмотрим реализацию метода GET :

@RestController
public class AccountResource {
    private final ReactiveAccountDao accountDao;

    public AccountResource(ReactiveAccountDao accountDao) {
        this.accountDao = accountDao;
    }

    @GetMapping("/accounts/{id}")
    public Mono> getAccount(@PathVariable("id") Long id) {
        return accountDao.findById(id)
          .map(acc -> new ResponseEntity<>(acc, HttpStatus.OK))
          .switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND)));
    }
    // ... other methods omitted
}

Здесь мы используем возвращенный Mono нашего DAO для создания ResponseEntity с соответствующим кодом состояния. Мы делаем это только потому, что нам нужен NOT_FOUND (404) код состояния, когда нет Учетной записи с заданным идентификатором.

8. Заключение

В этой статье мы рассмотрели основы реактивного доступа к базе данных с помощью R2DBC. Несмотря на то, что этот проект находится в зачаточном состоянии, он быстро развивается, ориентируясь на дату выпуска где-то в начале 2020 года.

По сравнению с ADB, который определенно не будет частью Java 12, R2DBC кажется более перспективным и уже предоставляет драйверы для нескольких популярных баз данных — Oracle здесь заметно отсутствует.

Как обычно, полный исходный код, используемый в этом учебнике, доступен на Github .