Рубрики
Без рубрики

Пружинный пакет с использованием разделителя

Узнайте, как реализовать задание с параллельной обработкой с помощью пакета Spring.

Автор оригинала: baeldung.

1. Обзор

В нашем предыдущем введении в Spring Batch мы представили фреймворк в качестве инструмента пакетной обработки. Мы также изучили детали конфигурации и реализацию однопоточного выполнения задания с одним процессом.

Для реализации задания с некоторой параллельной обработкой предусмотрен ряд опций. На более высоком уровне существует два режима параллельной обработки:

  1. Однопроцессный, многопоточный
  2. Многопроцессный

В этой краткой статье мы обсудим разделение Step , которое может быть реализовано как для одного процесса , так и для нескольких процессов.

2. Разделение шага

Весенний пакет с разделением предоставляет нам возможность разделить выполнение Шага :

Обзор секционирования

На приведенном выше рисунке показана реализация Задания с разделенным Шагом .

Есть Шаг , называемый “Master”, выполнение которого разделено на несколько “подчиненных” шагов. Эти рабы могут занять место хозяина, и результат все равно останется неизменным. Как ведущий, так и ведомый являются экземплярами Step . Ведомые устройства могут быть удаленными службами или просто локально выполняющимися потоками.

При необходимости мы можем передавать данные от ведущего устройства к ведомому. Метаданные (т. Е. JobRepository ) гарантируют, что каждое ведомое устройство выполняется только один раз в одном выполнении задания .

Вот схема последовательности, показывающая, как все это работает:

Шаг разбиения на разделы

Как показано, шаг Partition управляет выполнением. Обработчик разделов | отвечает за разделение работы “Ведущего” на “Подчиненные”. Самый правый Шаг – это ведомый.

3. Maven POM

Зависимости Maven такие же, как упоминалось в нашей предыдущей статье |. То есть Spring Core, Spring Batch и зависимость для базы данных (в нашем случае SQLite ).

4. Конфигурация

В нашей вводной статье мы видели пример преобразования некоторых финансовых данных из CSV в XML-файл. Давайте продолжим тот же пример.

Здесь мы преобразуем финансовую информацию из 5 CSV-файлов в соответствующие XML-файлы, используя многопоточную реализацию.

Мы можем достичь этого, используя одно Задание и Шаг секционирование. У нас будет пять потоков, по одному для каждого CSV-файла.

Прежде всего, давайте создадим работу:

@Bean(name = "partitionerJob")
public Job partitionerJob() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return jobs.get("partitioningJob")
      .start(partitionStep())
      .build();
}

Как мы видим, это Задание начинается с Шага Секционирования . Это наш главный шаг, который будет разделен на различные подчиненные шаги:

@Bean
public Step partitionStep() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return steps.get("partitionStep")
      .partitioner("slaveStep", partitioner())
      .step(slaveStep())
      .taskExecutor(taskExecutor())
      .build();
}

Здесь мы создадим шаг Секционирования с помощью StepBuilderFactory . Для этого нам нужно предоставить информацию о шагах Slave и Разделителе .

Разделитель – это интерфейс, который предоставляет возможность определить набор входных значений для каждого из подчиненных устройств. Другими словами, здесь идет логика разделения задач на соответствующие потоки.

Давайте создадим его реализацию под названием Custom MultiResourcePartitioner , где мы поместим имена входных и выходных файлов в ExecutionContext для передачи на каждый подчиненный шаг:

public class CustomMultiResourcePartitioner implements Partitioner {
 
    @Override
    public Map partition(int gridSize) {
        Map map = new HashMap<>(gridSize);
        int i = 0, k = 1;
        for (Resource resource : resources) {
            ExecutionContext context = new ExecutionContext();
            Assert.state(resource.exists(), "Resource does not exist: " 
              + resource);
            context.putString(keyName, resource.getFilename());
            context.putString("opFileName", "output"+k+++".xml");
            map.put(PARTITION_KEY + i, context);
            i++;
        }
        return map;
    }
}

Мы также создадим боб для этого класса, где мы дадим исходный каталог для входных файлов:

@Bean
public CustomMultiResourcePartitioner partitioner() {
    CustomMultiResourcePartitioner partitioner 
      = new CustomMultiResourcePartitioner();
    Resource[] resources;
    try {
        resources = resoursePatternResolver
          .getResources("file:src/main/resources/input/*.csv");
    } catch (IOException e) {
        throw new RuntimeException("I/O problems when resolving"
          + " the input file pattern.", e);
    }
    partitioner.setResources(resources);
    return partitioner;
}

Мы определим подчиненный шаг, как и любой другой шаг с читателем и писателем. Читатель и писатель будут такими же, как мы видели в нашем вводном примере, за исключением того, что они получат параметр filename из StepExecutionContext.

Обратите внимание, что эти компоненты должны иметь область действия шага, чтобы они могли получать параметры StepExecutionContext на каждом шаге. Если они не будут иметь области действия шага, их бобы будут созданы изначально и не будут принимать имена файлов на уровне шага:

@StepScope
@Bean
public FlatFileItemReader itemReader(
  @Value("#{stepExecutionContext[fileName]}") String filename)
  throws UnexpectedInputException, ParseException {
 
    FlatFileItemReader reader 
      = new FlatFileItemReader<>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    String[] tokens 
      = {"username", "userid", "transactiondate", "amount"};
    tokenizer.setNames(tokens);
    reader.setResource(new ClassPathResource("input/" + filename));
    DefaultLineMapper lineMapper 
      = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
    reader.setLinesToSkip(1);
    reader.setLineMapper(lineMapper);
    return reader;
}
@Bean
@StepScope
public ItemWriter itemWriter(Marshaller marshaller, 
  @Value("#{stepExecutionContext[opFileName]}") String filename)
  throws MalformedURLException {
    StaxEventItemWriter itemWriter 
      = new StaxEventItemWriter();
    itemWriter.setMarshaller(marshaller);
    itemWriter.setRootTagName("transactionRecord");
    itemWriter.setResource(new ClassPathResource("xml/" + filename));
    return itemWriter;
}

При упоминании reader и writer на подчиненном шаге мы можем передать аргументы как null, потому что эти имена файлов не будут использоваться, так как они получат имена файлов из StepExecutionContext :

@Bean
public Step slaveStep() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return steps.get("slaveStep").chunk(1)
      .reader(itemReader(null))
      .writer(itemWriter(marshaller(), null))
      .build();
}

5. Заключение

В этом уроке мы обсудили, как реализовать задание с параллельной обработкой с использованием пакета Spring.

Как всегда, полная реализация для этого примера доступна на GitHub .