1. Обзор
В этом уроке мы рассмотрим поддержку транзакций в рамках интеграции Spring .
2. Транзакции в потоках сообщений
Spring обеспечивает поддержку синхронизации ресурсов с транзакциями с самых ранних версий. Мы часто используем его для синхронизации транзакций, управляемых несколькими менеджерами транзакций.
Например, мы можем синхронизировать фиксацию JMS с фиксацией JDBC.
С другой стороны, у нас также есть более сложные варианты использования в потоках сообщений. Они включают синхронизацию нетранзакционных ресурсов, а также различные типы транзакционных ресурсов.
Как правило, потоки обмена сообщениями могут инициироваться двумя различными типами механизмов.
2.1. Потоки сообщений, инициируемые Пользовательским процессом
Некоторые потоки сообщений зависят от инициирования сторонних процессов, таких как запуск сообщения по какому-либо каналу сообщений или вызов метода шлюза сообщений.
Мы настраиваем поддержку транзакций для этих потоков через стандарт Spring поддержка транзакций . Потоки не должны быть явно настроены Spring Integration для поддержки транзакций. Поток сообщений Spring Integration естественным образом учитывает транзакционную семантику компонентов Spring.
Например, мы можем аннотировать активатор службы или его метод с помощью @Transactional :
@Transactional public class TxServiceActivator { @Autowired private JdbcTemplate jdbcTemplate; public void storeTestResult(String testResult) { this.jdbcTemplate.update("insert into STUDENT values(?)", testResult); log.info("Test result is stored: {}", testResult); } }
Мы можем запустить метод store Test Result из любого компонента, и транзакционный контекст будет применяться как обычно. При таком подходе мы полностью контролируем конфигурацию транзакций.
2.2. Потоки сообщений, инициируемые процессом демона
Мы часто используем этот тип потока сообщений для автоматизации. Например, Опросник опрашивает очередь сообщений, чтобы инициировать новый поток сообщений с опрошенным сообщением, или планировщик, планирующий процесс, создавая новое сообщение и инициируя поток сообщений в заданное время.
По сути, это потоки на основе триггеров, инициируемые процессом триггера (процессом демона). Для этих потоков мы должны предоставить некоторую конфигурацию транзакции, чтобы создать контекст транзакции всякий раз, когда начинается новый поток сообщений.
С помощью конфигурации мы делегируем потоки существующей поддержке транзакций Spring.
Мы сосредоточимся на поддержке транзакций для этого типа потока сообщений в остальной части статьи.
3. Поддержка транзакций Опроса
Опросник является общим компонентом в интеграционных потоках. Он периодически извлекает данные из различных источников и передает их по цепочке интеграции.
Интеграция Spring обеспечивает транзакционную поддержку для опросников из коробки. Каждый раз, когда мы настраиваем компонент Poller , мы можем предоставить транзакционную конфигурацию:
@Bean @InboundChannelAdapter(value = "someChannel", poller = @Poller(value = "pollerMetadata")) public MessageSourcesomeMessageSource() { ... } @Bean public PollerMetadata pollerMetadata() { return Pollers.fixedDelay(5000) .advice(transactionInterceptor()) .transactionSynchronizationFactory(transactionSynchronizationFactory) .get(); } private TransactionInterceptor transactionInterceptor() { return new TransactionInterceptorBuilder() .transactionManager(txManager) .build(); }
Мы должны предоставить ссылку на Диспетчер транзакций и пользовательскую Фабрику синхронизации транзакций , или мы можем полагаться на значения по умолчанию. Внутренне собственная транзакция Spring завершает процесс. В результате все потоки сообщений, инициируемые этим опросником, являются транзакционными.
4. Границы транзакций
При запуске транзакции контекст транзакции всегда привязан к текущему потоку. Независимо от того, сколько конечных точек и каналов у нас может быть в потоке сообщений, наш контекст транзакции всегда будет сохраняться до тех пор, пока поток живет в одном потоке.
Если мы нарушим его, инициировав новый поток в какой-либо службе, мы также нарушим Транзакционную границу. По сути, на этом транзакция закончится.
Если между потоками произошла успешная передача, поток будет считаться успешным. Это зафиксирует транзакцию в этот момент, но поток будет продолжаться, и это все равно может привести к исключению где-то ниже по течению.
Следовательно, это Исключение может вернуться к инициатору потока, так что транзакция может закончиться откатом. Вот почему мы должны использовать транзакционные каналы в любой точке, где граница потока может быть нарушена .
Например, мы должны использовать JMS, JDBC или какой-либо другой транзакционный канал.
5. Синхронизация транзакций
В некоторых случаях полезно синхронизировать определенные операции с транзакцией, которая охватывает весь поток.
Например, мы продемонстрируем, как использовать Poller , который считывает входящий файл и, основываясь на его содержимом, выполняет обновление базы данных. Когда операция с базой данных завершается, она также переименовывает файл в зависимости от успеха операции.
Прежде чем мы перейдем к примеру, важно понять, что этот подход синхронизирует операции в файловой системе с транзакцией. Это не делает файловую систему, которая по своей сути не является транзакционной, фактически транзакционной.
Транзакция начинается до опроса и либо фиксируется, либо откатывается по завершении потока, после чего выполняется синхронизированная операция в файловой системе.
Во-первых, мы определяем Адаптер входящего канала с помощью простого опросника :
@Bean @InboundChannelAdapter(value = "inputChannel", poller = @Poller(value = "pollerMetadata")) public MessageSourcefileReadingMessageSource() { FileReadingMessageSource sourceReader = new FileReadingMessageSource(); sourceReader.setDirectory(new File(INPUT_DIR)); sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN)); return sourceReader; } @Bean public PollerMetadata pollerMetadata() { return Pollers.fixedDelay(5000) .advice(transactionInterceptor()) .transactionSynchronizationFactory(transactionSynchronizationFactory) .get(); }
Опросник содержит ссылку на Менеджер транзакций, как объяснялось ранее. Кроме того, он также содержит ссылку на Фабрику синхронизации транзакций . Этот компонент обеспечивает механизм синхронизации операций файловой системы с транзакцией:
@Bean public TransactionSynchronizationFactory transactionSynchronizationFactory() { ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor(); SpelExpressionParser spelParser = new SpelExpressionParser(); processor.setAfterCommitExpression( spelParser.parseExpression( "payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))")); processor.setAfterRollbackExpression( spelParser.parseExpression( "payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))")); return new DefaultTransactionSynchronizationFactory(processor); }
Если транзакция фиксируется, Фабрика синхронизации транзакций переименует файл, добавив “.PASSED” к имени файла. Однако, если он откатится, он добавит “.FAILED”.
Входной канал преобразует полезную нагрузку с помощью FileToStringTransformer и делегирует ее toServiceChannel . Этот канал привязан к Активатору службы :
@Bean public MessageChannel inputChannel() { return new DirectChannel(); } @Bean @Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel") public FileToStringTransformer fileToStringTransformer() { return new FileToStringTransformer(); }
Активатор службы считывает входящий файл, содержащий результаты экзамена студента. Он записывает результат в базу данных. Если результат содержит строку “fail”, он вызывает Исключение , что приводит к откату базы данных:
@ServiceActivator(inputChannel = "toServiceChannel") public void serviceActivator(String payload) { jdbcTemplate.update("insert into STUDENT values(?)", payload); if (payload.toLowerCase().startsWith("fail")) { log.error("Service failure. Test result: {} ", payload); throw new RuntimeException("Service failure."); } log.info("Service success. Test result: {}", payload); }
После успешной фиксации или отката операции базы данных Фабрика синхронизации транзакций синхронизирует операцию файловой системы с ее результатом.
6. Заключение
В этой статье мы объяснили поддержку транзакций в рамках Spring Integration . Кроме того, мы продемонстрировали, как синхронизировать транзакцию с операциями на нетранзакционном ресурсе, таком как файловая система.
Полный исходный код для примера доступен на GitHub.