Автор оригинала: Magdalena Krause.
1. введение
Spring Batch предоставляет два различных способа реализации задания: использование тасклетов и фрагментов .
В этой статье мы узнаем, как настроить и реализовать оба метода, используя простой пример из реальной жизни.
2. Зависимости
Давайте начнем с добавления необходимых зависимостей :
org.springframework.batch spring-batch-core 4.3.0 org.springframework.batch spring-batch-test 4.3.0 test
Чтобы получить последнюю версию spring-batch-core и spring-batch-test , пожалуйста, обратитесь к Maven Central.
3. Наш Вариант Использования
Давайте рассмотрим CSV-файл со следующим содержимым:
Mae Hodges,10/22/1972 Gary Potter,02/22/1953 Betty Wise,02/17/1968 Wayne Rose,04/06/1977 Adam Caldwell,09/27/1995 Lucille Phillips,05/14/1992
первая позиция каждой строки представляет имя человека, а вторая позиция представляет его/ее дату рождения .
Наш вариант использования состоит в том, чтобы создать еще один CSV-файл, содержащий имя и возраст каждого человека :
Mae Hodges,45 Gary Potter,64 Betty Wise,49 Wayne Rose,40 Adam Caldwell,22 Lucille Phillips,25
Теперь, когда наша область ясна, давайте продолжим и создадим решение, используя оба подхода. Начнем с заданий.
4. Подход к задачам
4.1. Введение и проектирование
Тасклеты предназначены для выполнения одной задачи в рамках одного шага. Наша работа будет состоять из нескольких шагов, которые выполняются один за другим. Каждый шаг должен выполнять только одну определенную задачу .
Наша работа будет состоять из трех этапов:
- Считывание строк из входного CSV-файла.
- Рассчитайте возраст для каждого человека во входном CSV-файле.
- Запишите имя и возраст каждого человека в новый выходной CSV-файл.
Теперь, когда общая картина готова, давайте создадим по одному классу на шаг.
Считыватель строк будет отвечать за считывание данных из входного файла:
public class LinesReader implements Tasklet { // ... }
Процессор строк вычислит возраст для каждого человека в файле:
public class LinesProcessor implements Tasklet { // ... }
Наконец, Lines Writer будет нести ответственность за запись имен и возрастов в выходной файл:
public class LinesWriter implements Tasklet { // ... }
На этом этапе все наши шаги реализуют Tasklet интерфейс . Это заставит нас реализовать его метод execute :
@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { // ... }
В этом методе мы добавим логику для каждого шага. Прежде чем начать с этого кода, давайте настроим нашу работу.
4.2. Конфигурация
Нам нужно добавить некоторую конфигурацию в контекст приложения Spring . После добавления стандартного объявления bean для классов, созданных в предыдущем разделе, мы готовы создать наше определение задания:
@Configuration @EnableBatchProcessing public class TaskletsConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean protected Step readLines() { return steps .get("readLines") .tasklet(linesReader()) .build(); } @Bean protected Step processLines() { return steps .get("processLines") .tasklet(linesProcessor()) .build(); } @Bean protected Step writeLines() { return steps .get("writeLines") .tasklet(linesWriter()) .build(); } @Bean public Job job() { return jobs .get("taskletsJob") .start(readLines()) .next(processLines()) .next(writeLines()) .build(); } // ... }
Это означает, что наше “Задание tasklets” будет состоять из трех шагов. Первый ( readLines ) выполнит тасклет, определенный в bean lines Reader , и перейдет к следующему шагу: process Lines. Process Lines выполнит задание, определенное в процессоре bean lines , и перейдет к заключительному шагу: writeLines .
Наш рабочий поток определен, и мы готовы добавить немного логики!
4.3. Модель и Utils
Поскольку мы будем манипулировать строками в CSV-файле, мы создадим класс Line:
public class Line implements Serializable { private String name; private LocalDate dob; private Long age; // standard constructor, getters, setters and toString implementation }
Обратите внимание, что Line реализует Сериализуемый. Это связано с тем, что Line будет действовать как DTO для передачи данных между шагами. Согласно Spring Batch, объекты, которые передаются между шагами, должны быть сериализуемыми .
С другой стороны, мы можем начать думать о чтении и написании строк.
Для этого мы будем использовать Opencv:
com.opencsv opencsv 4.1
Найдите последнюю версию Opencv в Maven Central.
Как только OpenCSV будет включен, мы также создадим FileUtils класс . Он предоставит методы для чтения и записи строк CSV:
public class FileUtils { public Line readLine() throws Exception { if (CSVReader == null) initReader(); String[] line = CSVReader.readNext(); if (line == null) return null; return new Line( line[0], LocalDate.parse( line[1], DateTimeFormatter.ofPattern("MM/dd/yyyy"))); } public void writeLine(Line line) throws Exception { if (CSVWriter == null) initWriter(); String[] lineStr = new String[2]; lineStr[0] = line.getName(); lineStr[1] = line .getAge() .toString(); CSVWriter.writeNext(lineStr); } // ... }
Обратите внимание, что readLine действует как оболочка над методом read Next Opencv и возвращает объект Line .
Таким же образом WriteLine обертывает OpenCSV writeNext , получая Строку объект. Полную реализацию этого класса можно найти в проекте GitHub .
На данный момент мы все готовы начать с реализации каждого шага.
4.4. Считыватель строк
Давайте продолжим и завершим наш Linereader класс:
public class LinesReader implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesReader.class); private Listlines; private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { lines = new ArrayList<>(); fu = new FileUtils( "taskletsvschunks/input/tasklets-vs-chunks.csv"); logger.debug("Lines Reader initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { Line line = fu.readLine(); while (line != null) { lines.add(line); logger.debug("Read line: " + line.toString()); line = fu.readLine(); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeReader(); stepExecution .getJobExecution() .getExecutionContext() .put("lines", this.lines); logger.debug("Lines Reader ended."); return ExitStatus.COMPLETED; } }
Метод чтения строк execute создает экземпляр FileUtils по пути к входному файлу. Затем добавляет строки в список до тех пор, пока не останется больше строк для чтения .
Наш класс также реализует StepExecutionListener , который предоставляет два дополнительных метода: beforeStep и after Step . Мы будем использовать эти методы для инициализации и закрытия объектов до и после запуска execute .
Если мы посмотрим на после шага кода, мы заметим строку, в которой список результатов ( строки) помещается в контекст задания, чтобы сделать его доступным для следующего шага:
stepExecution .getJobExecution() .getExecutionContext() .put("lines", this.lines);
На данный момент наш первый шаг уже выполнил свою обязанность: загрузите строки CSV в Список в памяти. Давайте перейдем ко второму шагу и обработаем их.
4.5. Линейный процессор
Процессор строк также будет реализовывать StepExecutionListener и, конечно же, Tasklet . Это означает, что он будет реализовывать до шага , выполнять и после шага методы, а также:
public class LinesProcessor implements Tasklet, StepExecutionListener { private Logger logger = LoggerFactory.getLogger( LinesProcessor.class); private Listlines; @Override public void beforeStep(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution .getJobExecution() .getExecutionContext(); this.lines = (List ) executionContext.get("lines"); logger.debug("Lines Processor initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { for (Line line : lines) { long age = ChronoUnit.YEARS.between( line.getDob(), LocalDate.now()); logger.debug("Calculated age " + age + " for line " + line.toString()); line.setAge(age); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.debug("Lines Processor ended."); return ExitStatus.COMPLETED; } }
Легко понять, что он загружает строки список из контекста задания и вычисляет возраст каждого человека .
Нет необходимости помещать в контекст другой список результатов, поскольку изменения происходят с тем же объектом, что и на предыдущем шаге.
И мы готовы к нашему последнему шагу.
4.6. Автор строк
Задача автора строк состоит в том, чтобы перейти к списку строк и записать имя и возраст в выходной файл :
public class LinesWriter implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesWriter.class); private Listlines; private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution .getJobExecution() .getExecutionContext(); this.lines = (List ) executionContext.get("lines"); fu = new FileUtils("output.csv"); logger.debug("Lines Writer initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { for (Line line : lines) { fu.writeLine(line); logger.debug("Wrote line " + line.toString()); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeWriter(); logger.debug("Lines Writer ended."); return ExitStatus.COMPLETED; } }
Мы закончили с выполнением нашей работы! Давайте создадим тест, чтобы запустить его и посмотреть результаты.
4.7. Выполнение задания
Чтобы выполнить задание, мы создадим тест:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = TaskletsConfig.class) public class TaskletsTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test public void givenTaskletsJob_whenJobEnds_thenStatusCompleted() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(); assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); } }
Аннотация ContextConfiguration указывает на класс конфигурации контекста Spring, в котором содержится наше определение задания.
Нам нужно будет добавить пару дополнительных бобов перед запуском теста:
@Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } @Bean public JobRepository jobRepository() throws Exception { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); factory.setDataSource(dataSource()); factory.setTransactionManager(transactionManager()); return factory.getObject(); } @Bean public DataSource dataSource() { DriverManagerDataSource dataSource = new DriverManagerDataSource(); dataSource.setDriverClassName("org.sqlite.JDBC"); dataSource.setUrl("jdbc:sqlite:repository.sqlite"); return dataSource; } @Bean public PlatformTransactionManager transactionManager() { return new ResourcelessTransactionManager(); } @Bean public JobLauncher jobLauncher() throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository()); return jobLauncher; }
Все готово! Идите вперед и проведите тест!
После завершения задания файл output.csv содержит ожидаемое содержимое, а в журналах отображается поток выполнения:
[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized. [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended. [main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized. [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended. [main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized. [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25] [main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.
Вот и все для тасклетов. Теперь мы можем перейти к подходу “Куски”.
5. Подход по Частям
5.1. Введение и проектирование
Как следует из названия, этот подход выполняет действия над фрагментами данных . То есть, вместо чтения, обработки и записи всех строк сразу, он будет читать, обрабатывать и записывать фиксированное количество записей (фрагмент) за один раз.
Затем он будет повторять цикл до тех пор, пока в файле не останется больше данных.
В результате поток будет немного отличаться:
- Пока есть очереди:
- Сделать для X количество строк:
- Прочтите одну строчку
- Обработать одну строку
- Напишите X количество строк.
- Сделать для X количество строк:
Итак, нам также нужно создать три компонента для подхода, ориентированного на фрагменты :
public class LineReader { // ... }
public class LineProcessor { // ... }
public class LinesWriter { // ... }
Прежде чем перейти к реализации, давайте настроим нашу работу.
5.2. Конфигурация
Определение задания также будет выглядеть по-другому:
@Configuration @EnableBatchProcessing public class ChunksConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean public ItemReaderitemReader() { return new LineReader(); } @Bean public ItemProcessor itemProcessor() { return new LineProcessor(); } @Bean public ItemWriter itemWriter() { return new LinesWriter(); } @Bean protected Step processLines(ItemReader reader, ItemProcessor processor, ItemWriter writer) { return steps.get("processLines"). chunk(2) .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean public Job job() { return jobs .get("chunksJob") .start(processLines(itemReader(), itemProcessor(), itemWriter())) .build(); } }
В этом случае есть только один шаг, выполняющий только один набор задач.
Однако этот тасклет определяет читателя, писателя и процессор, который будет работать с фрагментами данных .
Обратите внимание, что интервал фиксации указывает объем данных, которые должны быть обработаны в одном фрагменте . Наша работа будет читать, обрабатывать и писать по две строки за раз.
Теперь мы готовы добавить наш кусок логики!
5.3. Finereader
Считыватель строк будет отвечать за чтение одной записи и возврат экземпляра Строки с ее содержимым.
Чтобы стать читателем, наш класс должен реализовать ItemReader интерфейс :
public class LineReader implements ItemReader{ @Override public Line read() throws Exception { Line line = fu.readLine(); if (line != null) logger.debug("Read line: " + line.toString()); return line; } }
Код прост, он просто читает одну строку и возвращает ее. Мы также реализуем StepExecutionListener для окончательной версии этого класса:
public class LineReader implements ItemReader, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LineReader.class); private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv"); logger.debug("Line Reader initialized."); } @Override public Line read() throws Exception { Line line = fu.readLine(); if (line != null) logger.debug("Read line: " + line.toString()); return line; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeReader(); logger.debug("Line Reader ended."); return ExitStatus.COMPLETED; } }
Следует отметить, что перед шагом и после шага выполняются до и после всего шага соответственно.
5.4. Линейный процессор
Линейный процессор следует в значительной степени той же логике, что и Считыватель строк .
Однако в этом случае мы реализуем ItemProcessor и его метод process() :
public class LineProcessor implements ItemProcessor{ private Logger logger = LoggerFactory.getLogger(LineProcessor.class); @Override public Line process(Line line) throws Exception { long age = ChronoUnit.YEARS .between(line.getDob(), LocalDate.now()); logger.debug("Calculated age " + age + " for line " + line.toString()); line.setAge(age); return line; } }
Метод process() принимает входную строку, обрабатывает ее и возвращает выходную строку . Опять же, мы также реализуем StepExecutionListener:
public class LineProcessor implements ItemProcessor, StepExecutionListener { private Logger logger = LoggerFactory.getLogger(LineProcessor.class); @Override public void beforeStep(StepExecution stepExecution) { logger.debug("Line Processor initialized."); } @Override public Line process(Line line) throws Exception { long age = ChronoUnit.YEARS .between(line.getDob(), LocalDate.now()); logger.debug( "Calculated age " + age + " for line " + line.toString()); line.setAge(age); return line; } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.debug("Line Processor ended."); return ExitStatus.COMPLETED; } }
5.5. Автор строк
В отличие от считывателя и процессора, Lines Writer запишет весь фрагмент строк , чтобы получить Список из Строк:
public class LinesWriter implements ItemWriter, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesWriter.class); private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { fu = new FileUtils("output.csv"); logger.debug("Line Writer initialized."); } @Override public void write(List extends Line> lines) throws Exception { for (Line line : lines) { fu.writeLine(line); logger.debug("Wrote line " + line.toString()); } } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeWriter(); logger.debug("Line Writer ended."); return ExitStatus.COMPLETED; } }
Строки Writer код говорит сам за себя. И снова мы готовы проверить нашу работу.
5.6. Выполнение задания
Мы создадим новый тест, такой же, как и тот, который мы создали для подхода tasklets:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = ChunksConfig.class) public class ChunksTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test public void givenChunksJob_whenJobEnds_thenStatusCompleted() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(); assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); } }
После настройки Chunks Config , как описано выше для TaskletsConfig , мы все готовы к запуску теста!
Как только работа будет выполнена, мы увидим, что output.csv снова содержит ожидаемый результат, а журналы описывают поток:
[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized. [main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized. [main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized. [main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25] [main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended. [main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended. [main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.
У нас тот же результат и другой поток . Журналы показывают, как задание выполняется в соответствии с этим подходом.
6. Заключение
Различные контексты покажут необходимость того или иного подхода. В то время как тасклеты кажутся более естественными для сценариев “одна задача за другой”, фрагменты обеспечивают простое решение для работы с разбиением на страницы или ситуациями, когда мы не хотим хранить значительный объем данных в памяти.
Полную реализацию этого примера можно найти в проекте GitHub .