Рубрики
Без рубрики

Весенний пакет – Тасклеты против кусков

Узнайте о двух способах реализации заданий в пакете Spring: tasklet и chunks.

Автор оригинала: Magdalena Krause.

1. введение

Spring Batch предоставляет два различных способа реализации задания: использование тасклетов и фрагментов .

В этой статье мы узнаем, как настроить и реализовать оба метода, используя простой пример из реальной жизни.

2. Зависимости

Давайте начнем с добавления необходимых зависимостей :


    org.springframework.batch
    spring-batch-core
    4.3.0


    org.springframework.batch
    spring-batch-test
    4.3.0
    test

Чтобы получить последнюю версию spring-batch-core и spring-batch-test , пожалуйста, обратитесь к Maven Central.

3. Наш Вариант Использования

Давайте рассмотрим CSV-файл со следующим содержимым:

Mae Hodges,10/22/1972
Gary Potter,02/22/1953
Betty Wise,02/17/1968
Wayne Rose,04/06/1977
Adam Caldwell,09/27/1995
Lucille Phillips,05/14/1992

первая позиция каждой строки представляет имя человека, а вторая позиция представляет его/ее дату рождения .

Наш вариант использования состоит в том, чтобы создать еще один CSV-файл, содержащий имя и возраст каждого человека :

Mae Hodges,45
Gary Potter,64
Betty Wise,49
Wayne Rose,40
Adam Caldwell,22
Lucille Phillips,25

Теперь, когда наша область ясна, давайте продолжим и создадим решение, используя оба подхода. Начнем с заданий.

4. Подход к задачам

4.1. Введение и проектирование

Тасклеты предназначены для выполнения одной задачи в рамках одного шага. Наша работа будет состоять из нескольких шагов, которые выполняются один за другим. Каждый шаг должен выполнять только одну определенную задачу .

Наша работа будет состоять из трех этапов:

  1. Считывание строк из входного CSV-файла.
  2. Рассчитайте возраст для каждого человека во входном CSV-файле.
  3. Запишите имя и возраст каждого человека в новый выходной CSV-файл.

Теперь, когда общая картина готова, давайте создадим по одному классу на шаг.

Считыватель строк будет отвечать за считывание данных из входного файла:

public class LinesReader implements Tasklet {
    // ...
}

Процессор строк вычислит возраст для каждого человека в файле:

public class LinesProcessor implements Tasklet {
    // ...
}

Наконец, Lines Writer будет нести ответственность за запись имен и возрастов в выходной файл:

public class LinesWriter implements Tasklet {
    // ...
}

На этом этапе все наши шаги реализуют Tasklet интерфейс . Это заставит нас реализовать его метод execute :

@Override
public RepeatStatus execute(StepContribution stepContribution, 
  ChunkContext chunkContext) throws Exception {
    // ...
}

В этом методе мы добавим логику для каждого шага. Прежде чем начать с этого кода, давайте настроим нашу работу.

4.2. Конфигурация

Нам нужно добавить некоторую конфигурацию в контекст приложения Spring . После добавления стандартного объявления bean для классов, созданных в предыдущем разделе, мы готовы создать наше определение задания:

@Configuration
@EnableBatchProcessing
public class TaskletsConfig {

    @Autowired 
    private JobBuilderFactory jobs;

    @Autowired 
    private StepBuilderFactory steps;

    @Bean
    protected Step readLines() {
        return steps
          .get("readLines")
          .tasklet(linesReader())
          .build();
    }

    @Bean
    protected Step processLines() {
        return steps
          .get("processLines")
          .tasklet(linesProcessor())
          .build();
    }

    @Bean
    protected Step writeLines() {
        return steps
          .get("writeLines")
          .tasklet(linesWriter())
          .build();
    }

    @Bean
    public Job job() {
        return jobs
          .get("taskletsJob")
          .start(readLines())
          .next(processLines())
          .next(writeLines())
          .build();
    }

    // ...

}

Это означает, что наше “Задание tasklets” будет состоять из трех шагов. Первый ( readLines ) выполнит тасклет, определенный в bean lines Reader , и перейдет к следующему шагу: process Lines. Process Lines выполнит задание, определенное в процессоре bean lines , и перейдет к заключительному шагу: writeLines .

Наш рабочий поток определен, и мы готовы добавить немного логики!

4.3. Модель и Utils

Поскольку мы будем манипулировать строками в CSV-файле, мы создадим класс Line:

public class Line implements Serializable {

    private String name;
    private LocalDate dob;
    private Long age;

    // standard constructor, getters, setters and toString implementation

}

Обратите внимание, что Line реализует Сериализуемый. Это связано с тем, что Line будет действовать как DTO для передачи данных между шагами. Согласно Spring Batch, объекты, которые передаются между шагами, должны быть сериализуемыми .

С другой стороны, мы можем начать думать о чтении и написании строк.

Для этого мы будем использовать Opencv:


    com.opencsv
    opencsv
    4.1

Найдите последнюю версию Opencv в Maven Central.

Как только OpenCSV будет включен, мы также создадим FileUtils класс . Он предоставит методы для чтения и записи строк CSV:

public class FileUtils {

    public Line readLine() throws Exception {
        if (CSVReader == null) 
          initReader();
        String[] line = CSVReader.readNext();
        if (line == null) 
          return null;
        return new Line(
          line[0], 
          LocalDate.parse(
            line[1], 
            DateTimeFormatter.ofPattern("MM/dd/yyyy")));
    }

    public void writeLine(Line line) throws Exception {
        if (CSVWriter == null) 
          initWriter();
        String[] lineStr = new String[2];
        lineStr[0] = line.getName();
        lineStr[1] = line
          .getAge()
          .toString();
        CSVWriter.writeNext(lineStr);
    }

    // ...
}

Обратите внимание, что readLine действует как оболочка над методом read Next Opencv и возвращает объект Line .

Таким же образом WriteLine обертывает OpenCSV writeNext , получая Строку объект. Полную реализацию этого класса можно найти в проекте GitHub .

На данный момент мы все готовы начать с реализации каждого шага.

4.4. Считыватель строк

Давайте продолжим и завершим наш Linereader класс:

public class LinesReader implements Tasklet, StepExecutionListener {

    private final Logger logger = LoggerFactory
      .getLogger(LinesReader.class);

    private List lines;
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        lines = new ArrayList<>();
        fu = new FileUtils(
          "taskletsvschunks/input/tasklets-vs-chunks.csv");
        logger.debug("Lines Reader initialized.");
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, 
      ChunkContext chunkContext) throws Exception {
        Line line = fu.readLine();
        while (line != null) {
            lines.add(line);
            logger.debug("Read line: " + line.toString());
            line = fu.readLine();
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeReader();
        stepExecution
          .getJobExecution()
          .getExecutionContext()
          .put("lines", this.lines);
        logger.debug("Lines Reader ended.");
        return ExitStatus.COMPLETED;
    }
}

Метод чтения строк execute создает экземпляр FileUtils по пути к входному файлу. Затем добавляет строки в список до тех пор, пока не останется больше строк для чтения .

Наш класс также реализует StepExecutionListener , который предоставляет два дополнительных метода: beforeStep и after Step . Мы будем использовать эти методы для инициализации и закрытия объектов до и после запуска execute .

Если мы посмотрим на после шага кода, мы заметим строку, в которой список результатов ( строки) помещается в контекст задания, чтобы сделать его доступным для следующего шага:

stepExecution
  .getJobExecution()
  .getExecutionContext()
  .put("lines", this.lines);

На данный момент наш первый шаг уже выполнил свою обязанность: загрузите строки CSV в Список в памяти. Давайте перейдем ко второму шагу и обработаем их.

4.5. Линейный процессор

Процессор строк также будет реализовывать StepExecutionListener и, конечно же, Tasklet . Это означает, что он будет реализовывать до шага , выполнять и после шага методы, а также:

public class LinesProcessor implements Tasklet, StepExecutionListener {

    private Logger logger = LoggerFactory.getLogger(
      LinesProcessor.class);

    private List lines;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        ExecutionContext executionContext = stepExecution
          .getJobExecution()
          .getExecutionContext();
        this.lines = (List) executionContext.get("lines");
        logger.debug("Lines Processor initialized.");
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, 
      ChunkContext chunkContext) throws Exception {
        for (Line line : lines) {
            long age = ChronoUnit.YEARS.between(
              line.getDob(), 
              LocalDate.now());
            logger.debug("Calculated age " + age + " for line " + line.toString());
            line.setAge(age);
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        logger.debug("Lines Processor ended.");
        return ExitStatus.COMPLETED;
    }
}

Легко понять, что он загружает строки список из контекста задания и вычисляет возраст каждого человека .

Нет необходимости помещать в контекст другой список результатов, поскольку изменения происходят с тем же объектом, что и на предыдущем шаге.

И мы готовы к нашему последнему шагу.

4.6. Автор строк

Задача автора строк состоит в том, чтобы перейти к списку строк и записать имя и возраст в выходной файл :

public class LinesWriter implements Tasklet, StepExecutionListener {

    private final Logger logger = LoggerFactory
      .getLogger(LinesWriter.class);

    private List lines;
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        ExecutionContext executionContext = stepExecution
          .getJobExecution()
          .getExecutionContext();
        this.lines = (List) executionContext.get("lines");
        fu = new FileUtils("output.csv");
        logger.debug("Lines Writer initialized.");
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, 
      ChunkContext chunkContext) throws Exception {
        for (Line line : lines) {
            fu.writeLine(line);
            logger.debug("Wrote line " + line.toString());
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeWriter();
        logger.debug("Lines Writer ended.");
        return ExitStatus.COMPLETED;
    }
}

Мы закончили с выполнением нашей работы! Давайте создадим тест, чтобы запустить его и посмотреть результаты.

4.7. Выполнение задания

Чтобы выполнить задание, мы создадим тест:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = TaskletsConfig.class)
public class TaskletsTest {

    @Autowired 
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Test
    public void givenTaskletsJob_whenJobEnds_thenStatusCompleted()
      throws Exception {
 
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();
        assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
    }
}

Аннотация ContextConfiguration указывает на класс конфигурации контекста Spring, в котором содержится наше определение задания.

Нам нужно будет добавить пару дополнительных бобов перед запуском теста:

@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
    return new JobLauncherTestUtils();
}

@Bean
public JobRepository jobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource());
    factory.setTransactionManager(transactionManager());
    return factory.getObject();
}

@Bean
public DataSource dataSource() {
    DriverManagerDataSource dataSource = new DriverManagerDataSource();
    dataSource.setDriverClassName("org.sqlite.JDBC");
    dataSource.setUrl("jdbc:sqlite:repository.sqlite");
    return dataSource;
}

@Bean
public PlatformTransactionManager transactionManager() {
    return new ResourcelessTransactionManager();
}

@Bean
public JobLauncher jobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository());
    return jobLauncher;
}

Все готово! Идите вперед и проведите тест!

После завершения задания файл output.csv содержит ожидаемое содержимое, а в журналах отображается поток выполнения:

[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized.
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended.
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized.
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.

Вот и все для тасклетов. Теперь мы можем перейти к подходу “Куски”.

5. Подход по Частям

5.1. Введение и проектирование

Как следует из названия, этот подход выполняет действия над фрагментами данных . То есть, вместо чтения, обработки и записи всех строк сразу, он будет читать, обрабатывать и записывать фиксированное количество записей (фрагмент) за один раз.

Затем он будет повторять цикл до тех пор, пока в файле не останется больше данных.

В результате поток будет немного отличаться:

  1. Пока есть очереди:
    • Сделать для X количество строк:
      • Прочтите одну строчку
      • Обработать одну строку
    • Напишите X количество строк.

Итак, нам также нужно создать три компонента для подхода, ориентированного на фрагменты :

public class LineReader {
     // ...
}
public class LineProcessor {
    // ...
}
public class LinesWriter {
    // ...
}

Прежде чем перейти к реализации, давайте настроим нашу работу.

5.2. Конфигурация

Определение задания также будет выглядеть по-другому:

@Configuration
@EnableBatchProcessing
public class ChunksConfig {

    @Autowired 
    private JobBuilderFactory jobs;

    @Autowired 
    private StepBuilderFactory steps;

    @Bean
    public ItemReader itemReader() {
        return new LineReader();
    }

    @Bean
    public ItemProcessor itemProcessor() {
        return new LineProcessor();
    }

    @Bean
    public ItemWriter itemWriter() {
        return new LinesWriter();
    }

    @Bean
    protected Step processLines(ItemReader reader,
      ItemProcessor processor, ItemWriter writer) {
        return steps.get("processLines"). chunk(2)
          .reader(reader)
          .processor(processor)
          .writer(writer)
          .build();
    }

    @Bean
    public Job job() {
        return jobs
          .get("chunksJob")
          .start(processLines(itemReader(), itemProcessor(), itemWriter()))
          .build();
    }

}

В этом случае есть только один шаг, выполняющий только один набор задач.

Однако этот тасклет определяет читателя, писателя и процессор, который будет работать с фрагментами данных .

Обратите внимание, что интервал фиксации указывает объем данных, которые должны быть обработаны в одном фрагменте . Наша работа будет читать, обрабатывать и писать по две строки за раз.

Теперь мы готовы добавить наш кусок логики!

5.3. Finereader

Считыватель строк будет отвечать за чтение одной записи и возврат экземпляра Строки с ее содержимым.

Чтобы стать читателем, наш класс должен реализовать ItemReader интерфейс :

public class LineReader implements ItemReader {
     @Override
     public Line read() throws Exception {
         Line line = fu.readLine();
         if (line != null) 
           logger.debug("Read line: " + line.toString());
         return line;
     }
}

Код прост, он просто читает одну строку и возвращает ее. Мы также реализуем StepExecutionListener для окончательной версии этого класса:

public class LineReader implements 
  ItemReader, StepExecutionListener {

    private final Logger logger = LoggerFactory
      .getLogger(LineReader.class);
 
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv");
        logger.debug("Line Reader initialized.");
    }

    @Override
    public Line read() throws Exception {
        Line line = fu.readLine();
        if (line != null) logger.debug("Read line: " + line.toString());
        return line;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeReader();
        logger.debug("Line Reader ended.");
        return ExitStatus.COMPLETED;
    }
}

Следует отметить, что перед шагом и после шага выполняются до и после всего шага соответственно.

5.4. Линейный процессор

Линейный процессор следует в значительной степени той же логике, что и Считыватель строк .

Однако в этом случае мы реализуем ItemProcessor и его метод process() :

public class LineProcessor implements ItemProcessor {

    private Logger logger = LoggerFactory.getLogger(LineProcessor.class);

    @Override
    public Line process(Line line) throws Exception {
        long age = ChronoUnit.YEARS
          .between(line.getDob(), LocalDate.now());
        logger.debug("Calculated age " + age + " for line " + line.toString());
        line.setAge(age);
        return line;
    }

}

Метод process() принимает входную строку, обрабатывает ее и возвращает выходную строку . Опять же, мы также реализуем StepExecutionListener:

public class LineProcessor implements 
  ItemProcessor, StepExecutionListener {

    private Logger logger = LoggerFactory.getLogger(LineProcessor.class);

    @Override
    public void beforeStep(StepExecution stepExecution) {
        logger.debug("Line Processor initialized.");
    }
    
    @Override
    public Line process(Line line) throws Exception {
        long age = ChronoUnit.YEARS
          .between(line.getDob(), LocalDate.now());
        logger.debug(
          "Calculated age " + age + " for line " + line.toString());
        line.setAge(age);
        return line;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        logger.debug("Line Processor ended.");
        return ExitStatus.COMPLETED;
    }
}

5.5. Автор строк

В отличие от считывателя и процессора, Lines Writer запишет весь фрагмент строк , чтобы получить Список из Строк:

public class LinesWriter implements 
  ItemWriter, StepExecutionListener {

    private final Logger logger = LoggerFactory
      .getLogger(LinesWriter.class);
 
    private FileUtils fu;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        fu = new FileUtils("output.csv");
        logger.debug("Line Writer initialized.");
    }

    @Override
    public void write(List lines) throws Exception {
        for (Line line : lines) {
            fu.writeLine(line);
            logger.debug("Wrote line " + line.toString());
        }
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        fu.closeWriter();
        logger.debug("Line Writer ended.");
        return ExitStatus.COMPLETED;
    }
}

Строки Writer код говорит сам за себя. И снова мы готовы проверить нашу работу.

5.6. Выполнение задания

Мы создадим новый тест, такой же, как и тот, который мы создали для подхода tasklets:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = ChunksConfig.class)
public class ChunksTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Test
    public void givenChunksJob_whenJobEnds_thenStatusCompleted() 
      throws Exception {
 
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();
 
        assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); 
    }
}

После настройки Chunks Config , как описано выше для TaskletsConfig , мы все готовы к запуску теста!

Как только работа будет выполнена, мы увидим, что output.csv снова содержит ожидаемый результат, а журналы описывают поток:

[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized.
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized.
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended.
[main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.

У нас тот же результат и другой поток . Журналы показывают, как задание выполняется в соответствии с этим подходом.

6. Заключение

Различные контексты покажут необходимость того или иного подхода. В то время как тасклеты кажутся более естественными для сценариев “одна задача за другой”, фрагменты обеспечивают простое решение для работы с разбиением на страницы или ситуациями, когда мы не хотим хранить значительный объем данных в памяти.

Полную реализацию этого примера можно найти в проекте GitHub .