Рубрики
Без рубрики

MongoDB тестовых контейнеров Модуль и весенние данные MongoDB в действии

Содержание Введение Реализация бизнес-логики Тестовая реализация Как можно… С тегами mongodb, spring boot, тестовые контейнеры, java.

содержание

  • Вступление
  • Реализация бизнес-логики
  • Реализация тестирования
  • Как я могу играть с кодом?
  • Что в этом для меня?
  • Хотите пойти глубже?
  • Связи

1. Вступление

Как я могу легко протестировать код транзакции с несколькими документами MongoDB без настройки MongoDB на моем устройстве? Можно возразить, что они должны сначала настроить его, потому что для выполнения такой транзакции ему нужен сеанс, для которого требуется набор реплик. К счастью, нет необходимости создавать набор реплик из 3 узлов, и мы можем выполнять эти транзакции только с одним экземпляром базы данных.

Для достижения этой цели мы можем сделать следующее:

  • Запустите контейнер MongoDB версии 4 или выше и укажите команду –replSet;
  • Инициализируйте один набор реплик, выполнив соответствующую команду;
  • Дождитесь завершения инициализации;
  • Подключитесь к автономному устройству без указания набора реплик, чтобы не беспокоиться об изменении файла хоста нашей операционной системы.

Стоит отметить, что набор реплик здесь не единственный вариант, поскольку в MongoDB версии 4.2 представлены распределенные транзакции в сегментированных кластерах, что выходит за рамки данной статьи.

Существует множество способов инициализации набора реплик, включая создание докеров, сценарии bash, службы в CI/CD и т. Д. Тем не менее, это требует некоторой дополнительной работы с точки зрения написания сценариев, обработки случайных портов и включения их в процесс CI/CD. К счастью, начиная с версии 1.14.2 тестовых контейнеров, мы можем делегировать всю тяжелую работу модулю MongoDB .

Давайте попробуем это на небольшой системе управления складом, основанной на Spring Boot 2.3. В недавнем прошлом приходилось использовать Реактивные операции и его Внутриоперационный метод, но с весны данные MongoDB 2.2 M4 мы смогли использовать старые добрые @Транзакционные аннотации или более продвинутые Транзакционный оператор .

Наше приложение должно иметь API REST для предоставления информации об успешно обработанных файлах, включая количество измененных документов. Что касается файлов, вызывающих ошибки на этом пути, мы должны пропустить их, чтобы обработать все файлы.

Можно отметить, что, хотя дублированные статьи и их размеры в одном файле являются редким случаем, эта возможность вполне реальна, и поэтому ее также следует учитывать.

В соответствии с бизнес-требованиями к нашей системе, у нас уже есть некоторые продукты в нашей базе данных, и мы загружаем кучу файлов Excel (xlsx), чтобы обновить некоторые поля соответствующих документов в нашем хранилище. Предполагается, что данные должны быть только на первом листе любой рабочей книги. Каждый файл обрабатывается в отдельной транзакции с несколькими документами, чтобы предотвратить одновременное изменение одних и тех же документов. Например, на рисунке 1 показаны случаи коллизий в отношении того, как заканчивается транзакция, за исключением возможного сценария, когда транзакции выполняются последовательно (представление json здесь сокращено для простоты). Транзакционное поведение помогает нам избежать столкновения данных и гарантирует согласованность.

Что касается коллекции продуктов, у нас есть статья в качестве уникального индекса. В то же время каждое изделие привязано к конкретному размеру. Поэтому для нашего приложения важно убедиться, что оба они находятся в базе данных перед обновлением. Рисунок 2 дает представление об этой коллекции.

2. Реализация бизнес-логики

Давайте подробно остановимся на основных моментах вышеупомянутой бизнес-логики и начнем с Контроллера продукта в качестве точки входа для обработки. Вы можете найти полный проект на GitHub . Необходимыми условиями являются Java8+ и Docker.

@PatchMapping(
  consumes = MediaType.MULTIPART_FORM_DATA_VALUE,
  produces = MediaType.APPLICATION_STREAM_JSON_VALUE
)
public ResponseEntity> patchProductQuantity(
  @RequestPart("file") Flux files,
  @AuthenticationPrincipal Principal principal
) {
  log.debug("shouldPatchProductQuantity");
  return ResponseEntity.accepted().body(
    uploadProductService.patchProductQuantity(files, principal.getName())
  );
}

1) Оберните ответ в ResponseEntity и верните поток данных Загрузки файла ; 2) Получите текущего участника аутентификации, который пригодится позже; 3) Передайте поток части файла для обработки.

Вот Количество исправленных продуктов метод Загрузки ProductServiceImpl :

public Flux patchProductQuantity(
  final Flux files,
  final String userName
) {
  return Mono.fromRunnable(() -> initRootDirectory(userName))
    .publishOn(Schedulers.newBoundedElastic(1, 1, "initRootDirectory"))
    .log(String.format("cleaning-up directory: %s", userName))
    .thenMany(files.flatMap(f ->
        saveFileToDiskAndUpdate(f, userName)
          .subscribeOn(Schedulers.boundedElastic())
      )
    );
}

1) Используйте имя пользователя в качестве имени корневого каталога; 2) Выполните инициализацию блокировки корневого каталога в отдельном эластичном потоке; 3) Для каждого файла Excel: 3.1) Сохраните его на диске; 3.2) Затем обновите количество продуктов в отдельном эластичном потоке, так как выполняется обработка блокировки файла.

Метод сохранить файл На Диск И Обновить выполняет следующую логику:

private Mono saveFileToDiskAndUpdate(
  final FilePart file,
  final String userName
) {
  final String fileName = file.filename();
  final Path path = Paths.get(pathToStorage, userName, fileName);
  return Mono.just(path)
    .log(String.format("A file: %s has been uploaded", fileName))
    .flatMap(file::transferTo)
    .log(String.format("A file: %s has been saved", fileName))
    .then(processExcelFile(fileName, userName, path));
}

1) Скопируйте содержимое файла в каталог пользователя; 2) После завершения этапа копирования вызовите метод processExcelFile .

На этом этапе мы собираемся разделить логику в соответствии с размером файла:

private Mono processExcelFile(
  final String fileName,
  final String userName,
  final Path path
) {
  return Mono.fromCallable(() -> Files.size(path))
    .flatMap(size -> {
      if (size >= bigFileSizeThreshold) {
        return processBigExcelFile(fileName, userName);
      } else {
        return processSmallExcelFile(fileName, userName);
      }
    });
}

1) Оберните блокирующий Файлы.размер(путь) вызов Моно.из вызываемого ; 2) порог большого размера файла вводится из соответствующего приложения.файл yml через @Значение ("${порог размера загружаемого файла.большого файла}") .

Прежде чем подробно останавливаться на обработке файлов Excel в зависимости от их размера, мы должны взглянуть на GetProducts метод Файл Excel Dao Impl :

@Override
public Flux getProducts(
  final String pathToStorage,
  final String fileName,
  final String userName
) {
  return Flux.defer(() -> {
    FileInputStream is;
    Workbook workbook;
    try {
      final File file = Paths.get(pathToStorage, userName, fileName).toFile();
      verifyFileAttributes(file);
      is = new FileInputStream(file);
      workbook = StreamingReader.builder()
        .rowCacheSize(ROW_CACHE_SIZE)
        .bufferSize(BUFFER_SIZE)
        .open(is);
    } catch (IOException e) {
      return Mono.error(new UploadProductException(
        String.format("An exception has been occurred while parsing a file: %s " +
          "has been saved", fileName), e));
    }

    try {
      final Sheet datatypeSheet = workbook.getSheetAt(0);
      final Iterator iterator = datatypeSheet.iterator();

      final AtomicInteger rowCounter = new AtomicInteger();
      if (iterator.hasNext()) {
        final Row currentRow = iterator.next();
        rowCounter.incrementAndGet();
        verifyExcelFileHeader(fileName, currentRow);
      }
      return Flux.create(fluxSink -> fluxSink.onRequest(value -> {
        try {
          for (int i = 0; i < value; i++) {
            if (!iterator.hasNext()) {
              fluxSink.complete();
              return;
            }

            final Row currentRow = iterator.next();
            final Product product = Objects.requireNonNull(getProduct(
              FileRow.builder()
                .fileName(fileName)
                .currentRow(currentRow)
                .rowCounter(rowCounter.incrementAndGet())
                .build()
            ), "product is not supposed to be null");
            fluxSink.next(product);
          }
        } catch (Exception e1) {
          fluxSink.error(e1);
        }
      })).doFinally(signalType -> {
        try {
          is.close();
          workbook.close();
        } catch (IOException e1) {
          log.error("Error has occurred while releasing {} resources: {}", fileName, e1);
        }
      });
    } catch (Exception e) {
      return Mono.error(e);
    }
  });
}

1) отличается вся логика, как только появляется новый подписчик; 2) Проверьте заголовок файла excel; 3) Создайте поток для предоставления запрошенного количества продуктов; 4) Преобразуйте строку Excel в Продукт объект домена; 5) Наконец, закройте все открытые ресурсы.

Возвращаясь к обработке файлов Excel в Загрузить ProductServiceImpl , , мы собираемся использовать метод Mongodb массовая запись в коллекции для массового обновления продуктов, для чего требуется тщательно оцененный список UpdateOneModel

Что касается небольших файлов Excel, мы предоставляем более подробный журнал и проводим дополнительную проверку достоверности:

private Mono processSmallExcelFile(
  final String fileName,
  final String userName
) {
  log.debug("processSmallExcelFile: {}", fileName);
  return excelFileDao.getProducts(pathToStorage, fileName, userName)
    .reduce(new ConcurrentHashMap, BigInteger>>(),
      (indexMap, product) -> {
        final BigInteger quantity = product.getQuantity();
        indexMap.merge(
          new ProductArticleSizeDto(product.getArticle(), product.getSize()),
          Tuples.of(
            updateOneModelConverter.convert(Tuples.of(product, quantity, userName)),
            quantity
          ),
          (oldValue, newValue) -> {
            final BigInteger mergedQuantity = oldValue.getT2().add(newValue.getT2());
            return Tuples.of(
              updateOneModelConverter.convert(Tuples.of(product, mergedQuantity, userName)),
              mergedQuantity
            );
          }

        );
        return indexMap;
      })
    .filterWhen(productIndexFile ->
      productDao.findByArticleIn(extractArticles(productIndexFile.keySet()))
        .handle(
          (productArticleSizeDto, synchronousSink) -> {
            if (productIndexFile.containsKey(productArticleSizeDto)) {
              synchronousSink.next(productArticleSizeDto);
            } else {
              synchronousSink.error(new UploadProductException(
                String.format(
                  "A file %s does not have an article: %d with size: %s",
                  fileName,
                  productArticleSizeDto.getArticle(),
                  productArticleSizeDto.getSize()
                )
              ));
            }
          })
        .count()
        .handle((sizeDb, synchronousSink) -> {
          final int sizeFile = productIndexFile.size();
          if (sizeDb == sizeFile) {
            synchronousSink.next(Boolean.TRUE);
          } else {
            synchronousSink.error(new UploadProductException(
              String.format(
                "Inconsistency between total element size in MongoDB: %d and a file %s: %d",
                sizeDb,
                fileName,
                sizeFile
              )
            ));
          }
        })
    ).onErrorResume(e -> {
      log.debug("Exception while processExcelFile fileName: {}: {}", fileName, e);
      return Mono.empty();
    }).flatMap(productIndexFile ->
      productPatcherService.incrementProductQuantity(
        fileName,
        productIndexFile.values().stream().map(Tuple2::getT1).collect(Collectors.toList()),
        userName
      )
    ).map(bulkWriteResult -> FileUploadDto.builder()
      .fileName(fileName)
      .matchedCount(bulkWriteResult.getMatchedCount())
      .modifiedCount(bulkWriteResult.getModifiedCount())
      .build()
    );
}

1) уменьшить помогает нам обрабатывать дубликаты продуктов, количество которых должно быть суммировано; 2) Соберите карту, чтобы получить список продуктов, указанных в в паре со списком UpdateOneModel и общее количество для продукта. Первый используется для сопоставления статьи и ее размера в файле с теми, которые находятся в базе данных, с помощью проекции ProductArticleSizeDto ; 3) Используйте атомарный слияние метод ConcurrentMap для суммирования количества одинаковых продуктов и создания новой обновленной модели ; 4) Отфильтровать все продукты в файле по тем статьям продукта, которые есть в базе данных; 5) Каждая Товарная статья размером с , найденная в хранилище, соответствует Товарной статье размером с из файла, суммированного по количеству; 6) Затем посчитайте результат после фильтрации, который должен быть равен определенному количеству продуктов в файле; 7) Используйте метод onerrorresume для продолжения при возникновении любой ошибки, потому что нам нужно обработать все файлы, как указано в требованиях; 8) Извлеките список UpdateOneModel из карты, собранной ранее, для дальнейшего использования в методе incrementProductQuantity ; 9) Затем запустите метод incrementProductQuantity в качестве подпроцесса в плосКая карта и сопоставьте его результат в FileUploadDto , в котором нуждаются наши бизнес-пользователи.

Даже несмотря на то, что фильтровать, Когда и последующие productdao.findbyarticlein позволяют нам провести некоторую дополнительную проверку на ранней стадии, они стоят дорого, что особенно заметно при обработке больших файлов на практике. Однако метод увеличить количество продукта может сравнивать количество измененных документов и сопоставлять их с количеством отдельных продуктов в файле. Зная это, мы можем реализовать более легкий вариант для обработки больших файлов:

private Mono processBigExcelFile(
  final String fileName,
  final String userName
) {
  log.debug("processBigExcelFile: {}", fileName);
  return excelFileDao.getProducts(pathToStorage, fileName, userName)
    .reduce(new ConcurrentHashMap, BigInteger>>(),
      (indexMap, product) -> {
        final BigInteger quantity = product.getQuantity();
        indexMap.merge(
          product,
          Tuples.of(
            updateOneModelConverter.convert(Tuples.of(product, quantity, userName)),
            quantity
          ),
          (oldValue, newValue) -> {
            final BigInteger mergedQuantity = oldValue.getT2().add(newValue.getT2());
            return Tuples.of(
              updateOneModelConverter.convert(Tuples.of(product, mergedQuantity, userName)),
              mergedQuantity
            );
          }

        );
        return indexMap;
      })
    .map(indexMap -> indexMap.values().stream().map(Tuple2::getT1).collect(Collectors.toList()))
    .onErrorResume(e -> {
      log.debug("Exception while processExcelFile: {}: {}", fileName, e);
      return Mono.empty();
    }).flatMap(dtoList ->
      productPatcherService.incrementProductQuantity(
        fileName,
        dtoList,
        userName
      )
    ).map(bulkWriteResult -> FileUploadDto.builder()
      .fileName(fileName)
      .matchedCount(bulkWriteResult.getMatchedCount())
      .modifiedCount(bulkWriteResult.getModifiedCount())
      .build()
    );
}

Вот Продукт И Имя пользователя Для Обновления Одного Конвертера моделей , который мы использовали для создания Обновления Одной модели :

@Component
public class ProductAndUserNameToUpdateOneModelConverter implements
  Converter, UpdateOneModel> {

  @Override
  @NonNull
  public UpdateOneModel convert(@NonNull Tuple3 source) {
    Objects.requireNonNull(source);
    final Product product = source.getT1();
    final BigInteger quantity = source.getT2();
    final String userName = source.getT3();

    return new UpdateOneModel<>(
      Filters.and(
        Filters.eq(Product.SIZE_DB_FIELD, product.getSize().name()),
        Filters.eq(Product.ARTICLE_DB_FIELD, product.getArticle())
      ),
      Document.parse(
        String.format(
          "{ $inc: { %s: %d } }",
          Product.QUANTITY_DB_FIELD,
          quantity
        )
      ).append(
        "$set",
        new Document(
          Product.LAST_MODIFIED_BY_DB_FIELD,
          userName
        )
      ),
      new UpdateOptions().upsert(false)
    );
  }
}

1) Во-первых, найдите документ по статье и размеру. Рисунок 2 показывает, что у нас есть составной индекс по полям размера и артикула коллекции продуктов для облегчения такого поиска; 2) Увеличьте количество найденного документа и задайте имя пользователя в поле lastModifiedBy ; 3) Также здесь можно вставить документ, но нас интересует только изменение существующих документов в хранилище.

Теперь мы готовы реализовать центральную часть нашей обработки, которая заключается в увеличении количества продукта метод productпатчердаоимпл :

@Override
public Mono incrementProductQuantity(
  final String fileName,
  final List> models,
  final String userName
) {
  return transactionalOperator.execute(
    action -> reactiveMongoOperations.getCollection(Product.COLLECTION_NAME)
      .flatMap(collection ->
        Mono.from(collection.bulkWrite(models, new BulkWriteOptions().ordered(true)))

      ).handle((bulkWriteResult, synchronousSink) -> {
        final int fileCount = models.size();
        if (Objects.equals(bulkWriteResult.getModifiedCount(), fileCount)) {
          synchronousSink.next(bulkWriteResult);
        } else {
          synchronousSink.error(
            new IllegalStateException(
              String.format(
                "Inconsistency between modified doc count: %d and file doc count: %d. Please, check file: %s",
                bulkWriteResult.getModifiedCount(), fileCount, fileName
              )
            )
          );
        }

      }).onErrorResume(
        e -> Mono.fromRunnable(action::setRollbackOnly)
          .log("Exception while incrementProductQuantity: " + fileName + ": " + e)
          .then(Mono.empty())
      )
  ).singleOrEmpty();
}

1) Используйте транзакционный оператор для отката транзакции вручную. Как уже упоминалось ранее, наша цель – обработать все файлы, пропуская те, которые вызывают исключения; 2) Запустите один подпроцесс для последовательной массовой записи изменений в базу данных для быстрого и менее ресурсоемкого поведения. Слово “одиночный” здесь имеет первостепенное значение, потому что мы избегаем опасной “проблемы с запросами N+1”, приводящей к появлению множества подпроцессов в потоке внутри плосКая карта ; 3) Обрабатывать ситуацию, когда количество обработанных документов не совпадает с тем, которое поступает из отдельного количества продуктов в файле; 4) Метод при возобновлении ошибки обрабатывает откат транзакции, а затем возвращает Mono.empty() , чтобы пропустить текущую обработку; 5) Ожидайте либо одного элемента, либо пустого Моно в результате транзакционного оператора.выполнить метод.

Можно было бы сказать: “Вы вызвали collection.bulkWrite(модели, новые BulkWriteOptions().упорядочено(верно)) , как насчет того, чтобы назначить сеанс?”. Дело в том, что Session Aware MethodИнтерцептор данных Spring MongoDB делает это с помощью отражения:

ReflectionUtils.invokeMethod(targetMethod.get(), target,
        prependSessionToArguments(session, methodInvocation)

Вот метод добавления сеанса К Аргументам :

private static Object[] prependSessionToArguments(ClientSession session, MethodInvocation invocation) {

  Object[] args = new Object[invocation.getArguments().length + 1];

  args[0] = session;
  System.arraycopy(invocation.getArguments(), 0, args, 1, invocation.getArguments().length);

  return args;
}

1) Получить аргументы Вызова метода ; 2) Добавить сеанс в качестве первого элемента в массиве args .

На самом деле, следующий метод MongoCollectionImpl называется:

@Override
public Publisher bulkWrite(final ClientSession clientSession,
                                            final List> requests,
                                            final BulkWriteOptions options) {
  return Publishers.publish(
    callback -> wrapped.bulkWrite(clientSession.getWrapped(), requests, options, callback));
}

3. Реализация тестирования

Пока все идет хорошо, мы можем создавать интеграционные тесты, чтобы охватить нашу логику.

Для начала мы создаем Productscontroller IT Test для тестирования нашего общедоступного API с помощью клиента Spring Веб-тестирования и инициализируем экземпляр MongoDB для запуска тестов против:

private static final MongoDBContainer MONGO_DB_CONTAINER =
  new MongoDBContainer("mongo:4.2.8");

1) Используйте статическое поле, чтобы иметь отдельные контейнеры для тестирования Контейнер MongoDB для всех методов тестирования в Productscontroller IT Test ; 2) Мы используем версию контейнера 4.2.8 MongoDB из Docker Hub, поскольку она является последней стабильной версией, в противном случае Контейнер MongoDB по умолчанию равен 4.0.10.

Затем в статических методах настройте все и снести Все мы запускаем и останавливаем Контейнер MongoDB соответственно. Несмотря на то, что мы не используем здесь функцию многократного использования тестовых контейнеров, мы оставляем открытой возможность ее настройки. Именно поэтому мы вызываем MONGO_DB_CONTAINER.stop()

@BeforeAll
static void setUpAll() {
    MONGO_DB_CONTAINER.start();
}

@AfterAll
static void tearDownAll() {
  if (!MONGO_DB_CONTAINER.isShouldBeReused()) {
    MONGO_DB_CONTAINER.stop();
  }
}

Демонтаж отвечает за очистку изменений, которые выполняет каждый тест:

@AfterEach
void tearDown() {
    StepVerifier.create(productDao.deleteAll()).verifyComplete();
}

Далее мы устанавливаем весна.данные.mongodb.uri выполнив MONGO_DB_CONTAINER.получить Url-адрес набора реплик() в ApplicationContextInitializer :

static class Initializer implements ApplicationContextInitializer {
  @Override
  public void initialize(@NotNull ConfigurableApplicationContext configurableApplicationContext) {
    TestPropertyValues.of(
      String.format("spring.data.mongodb.uri: %s", MONGO_DB_CONTAINER.getReplicaSetUrl())
    ).applyTo(configurableApplicationContext);
  }
}

Теперь мы готовы написать первый тест без каких-либо конфликтов транзакций, потому что в наших тестовых файлах (см. Рисунок 3) есть продукты, статьи которых не конфликтуют друг с другом.

Рисунок 3 Файлы Excel, не вызывающие конфликтов в статьях продуктов

@WithMockUser(
  username = SecurityConfig.ADMIN_NAME,
  password = SecurityConfig.ADMIN_PAS,
  authorities = SecurityConfig.WRITE_PRIVILEGE
)
@Test
void shouldPatchProductQuantity() {
  //GIVEN
  insertMockProductsIntoDb(Flux.just(product1, product2, product3));
  final BigInteger expected1 = BigInteger.valueOf(16);
  final BigInteger expected2 = BigInteger.valueOf(27);
  final BigInteger expected3 = BigInteger.valueOf(88);
  final String fileName1 = "products1.xlsx";
  final String fileName3 = "products3.xlsx";
  final String[] fileNames = {fileName1, fileName3};
  final FileUploadDto fileUploadDto1 = ProductTestUtil.mockFileUploadDto(fileName1, 2);
  final FileUploadDto fileUploadDto3 = ProductTestUtil.mockFileUploadDto(fileName3, 1);

  //WHEN
  final WebTestClient.ResponseSpec exchange = webClient
    .patch()
    .uri(BASE_URL)
    .contentType(MediaType.MULTIPART_FORM_DATA)
    .body(BodyInserters.fromMultipartData(ProductTestUtil.getMultiPartFormData(fileNames)))
    .exchange();

  //THEN
  exchange.expectStatus().isAccepted();

  exchange.expectBodyList(FileUploadDto.class)
    .hasSize(2)
    .contains(fileUploadDto1, fileUploadDto3);

  StepVerifier.create(productDao.findAllByOrderByQuantityAsc())
    .assertNext(product -> assertEquals(expected1, product.getQuantity()))
    .assertNext(product -> assertEquals(expected2, product.getQuantity()))
    .assertNext(product -> assertEquals(expected3, product.getQuantity()))
    .verifyComplete();
}

Наконец, давайте протестируем столкновение транзакций в действии, имея в виду Рисунок 1 и рисунок 4, показывающие такие файлы:

Рисунок 4 Файлы Excel, вызывающие коллизию в статьях продуктов

@WithMockUser(
  username = SecurityConfig.ADMIN_NAME,
  password = SecurityConfig.ADMIN_PAS,
  authorities = SecurityConfig.WRITE_PRIVILEGE
)
@Test
void shouldPatchProductQuantityConcurrently() {
  //GIVEN
  TransactionUtil.setMaxTransactionLockRequestTimeoutMillis(
    20,
    MONGO_DB_CONTAINER.getReplicaSetUrl()
  );
  insertMockProductsIntoDb(Flux.just(product1, product2));
  final String fileName1 = "products1.xlsx";
  final String fileName2 = "products2.xlsx";
  final String[] fileNames = {fileName1, fileName2};
  final BigInteger expected120589Sum = BigInteger.valueOf(19);
  final BigInteger expected120590Sum = BigInteger.valueOf(32);
  final BigInteger expected120589T1 = BigInteger.valueOf(16);
  final BigInteger expected120589T2 = BigInteger.valueOf(12);
  final BigInteger expected120590T1 = BigInteger.valueOf(27);
  final BigInteger expected120590T2 = BigInteger.valueOf(11);
  final FileUploadDto fileUploadDto1 = ProductTestUtil.mockFileUploadDto(fileName1, 2);
  final FileUploadDto fileUploadDto2 = ProductTestUtil.mockFileUploadDto(fileName2, 2);

  //WHEN
  final WebTestClient.ResponseSpec exchange = webClient
    .patch()
    .uri(BASE_URL)
    .contentType(MediaType.MULTIPART_FORM_DATA)
    .accept(MediaType.APPLICATION_STREAM_JSON)
    .body(BodyInserters.fromMultipartData(ProductTestUtil.getMultiPartFormData(fileNames)))
    .exchange();

  //THEN
  exchange.expectStatus().isAccepted();
  assertThat(
    extractBodyArray(exchange),
    either(arrayContaining(fileUploadDto1))
      .or(arrayContaining(fileUploadDto2))
      .or(arrayContainingInAnyOrder(fileUploadDto1, fileUploadDto2))
  );

  final List list = productDao.findAll(Sort.by(Sort.Direction.ASC, "article"))
    .toStream().collect(Collectors.toList());
  assertThat(list.size(), is(2));

  assertThat(
    list.stream().map(Product::getQuantity).toArray(BigInteger[]::new),
    either(arrayContaining(expected120589T1, expected120590T1))
      .or(arrayContaining(expected120589T2, expected120590T2))
      .or(arrayContaining(expected120589Sum, expected120590Sum))
  );
  TransactionUtil.setMaxTransactionLockRequestTimeoutMillis(
    5,
    MONGO_DB_CONTAINER.getReplicaSetUrl()
  );
}

1) Мы можем указать максимальное время в миллисекундах, в течение которого транзакции с несколькими документами должны ожидать получения блокировок, требуемых операциями в транзакции (по умолчанию транзакции с несколькими документами ждут 5 миллисекунд); 2) В качестве примера здесь мы могли бы использовать вспомогательный метод для изменения 5 мс на 20 мс (см. Подробности реализации ниже).

Обратите внимание, что параметр max Transaction Lock Request Timeout Миллис не имеет смысла для данного конкретного тестового случая и служит целям примера. После запуска этого тестового класса 120 раз с помощью скрипта ./load_test.sh 120 Productscontrollertest.shouldpatchproductquantityconcurrently в каталоге инструментов проекта я получил следующие цифры:

Успехи T1 61 56
Успехи T2 57 63
Успех T1 и T2 2 1

Просматривая журналы, мы можем наткнуться на что-то вроде:

Исключение при увеличении количества продукта: products1.xlsx: ком.монгодб. Исключение MongoCommandException: Команда не выполнена с ошибкой 112 (Конфликт записи): “Конфликт записи” на сервере… Инициирование отката транзакции… Инициирование фиксации транзакции… Собирается прервать транзакцию для сеанса… Собирается совершить транзакцию для сеанса…

Затем давайте протестируем обработку большого файла, содержащего 1 миллион продуктов, в отдельном Тесте загрузки исправленного продукта :

@WithMockUser(
  username = SecurityConfig.ADMIN_NAME,
  password = SecurityConfig.ADMIN_PAS,
  authorities = SecurityConfig.WRITE_PRIVILEGE
)
@Test
void shouldPatchProductQuantityBigFile() {
  //GIVEN
  unzipClassPathFile("products_1M.zip");

  final String fileName = "products_1M.xlsx";
  final int count = 1000000;
  final long totalQuantity = 500472368779L;
  final List products = getDocuments(count);

  TransactionUtil.setTransactionLifetimeLimitSeconds(
    900,
    MONGO_DB_CONTAINER.getReplicaSetUrl()
  );

  StepVerifier.create(
    reactiveMongoTemplate.remove(new Query(), Product.COLLECTION_NAME)
      .then(reactiveMongoTemplate.getCollection(Product.COLLECTION_NAME))
      .flatMapMany(c -> c.insertMany(products))
      .switchIfEmpty(Mono.error(new RuntimeException("Cannot insertMany")))
      .then(getTotalQuantity())
  ).assertNext(t -> assertEquals(totalQuantity, t)).verifyComplete();

  //WHEN
  final Instant start = Instant.now();
  final WebTestClient.ResponseSpec exchange = webClient
    .patch()
    .uri(BASE_URL)
    .contentType(MediaType.MULTIPART_FORM_DATA)
    .accept(MediaType.APPLICATION_STREAM_JSON)
    .body(BodyInserters.fromMultipartData(ProductTestUtil.getMultiPartFormData("products_1M.xlsx")))
    .exchange();

  //THEN
  exchange
    .expectStatus()
    .isAccepted()
    .expectBodyList(FileUploadDto.class)
    .contains(ProductTestUtil.mockFileUploadDto(fileName, count));
  StepVerifier.create(getTotalQuantity())
    .assertNext(t -> assertEquals(totalQuantity * 2, t))
    .verifyComplete();
  log.debug(
    "============= shouldPatchProductQuantityBigFile elapsed {}minutes =============",
    Duration.between(start, Instant.now()).toMinutes()
  );
}

1) Общая настройка аналогична Productscontrollertest ; 2) Распакуйте файл json, содержащий 1 миллион продуктов, для которого требуется около 254 миллионов на диске; 3) Транзакции имеют ограничение на срок службы, указанное в transactionLifetimeLimitSeconds , которое по умолчанию составляет 60 секунд. Нам нужно увеличить его здесь, потому что обычно для обработки такого файла требуется более 60 секунд. Для этого мы используем вспомогательный метод, чтобы изменить этот срок службы на 900 с (см. Подробности реализации ниже). Для вашей информации, ОСТАВШИЙСЯ вызов с файлом занимает около 9-12 минут на GitHub; 4) Перед обработкой мы очищаем коллекцию продуктов, вставляем 1 миллион продуктов из файла json, а затем получаем общее количество; 5) Учитывая, что продукты в файле json и большом файле excel равны, мы утверждаем, что общее количество продукта после обработки должно удвоиться.

Для такого теста требуется относительно большая куча около 4 ГБ (см. Рисунок 6) и ресурс памяти Docker около 6 ГБ (см. Рисунок 7):

Рисунок 6 Куча монитора VisualVM при загрузке файла продукта объемом 1 миллион

Рисунок 7 Общее использование памяти Cadvisor при загрузке файла продукта объемом 1 миллион

Как мы видим, разумно настроить максимальный объем дискового пространства, разрешенный для частей файла, и максимальное количество частей, разрешенных в данном составном запросе. Именно поэтому я добавил свойства в соответствующее приложение.файл yml, а затем установите их в настройка кодеков Http-сообщений метод реализованного Webfluxконфигуратор . Однако добавление ограничителя скорости и настройка Планировщики могут быть лучшим решением в производственной среде. Обратите внимание, что мы используем Планировщики.ограниченные эластичные() здесь, имея пул 10 * Runtime.getRuntime().Доступные процессоры() потоки по умолчанию.

Вот TransactionUtil , содержащий вышеупомянутые вспомогательные методы:

public class TransactionUtil {
  private TransactionUtil() {
  }

  public static void setTransactionLifetimeLimitSeconds(
    final int duration,
    final String replicaSetUrl
  ) {
    setMongoParameter("transactionLifetimeLimitSeconds", duration, replicaSetUrl);
  }

  public static void setMaxTransactionLockRequestTimeoutMillis(
    final int duration,
    final String replicaSetUrl
  ) {
    setMongoParameter("maxTransactionLockRequestTimeoutMillis", duration, replicaSetUrl);
  }

  private static void setMongoParameter(
    final String param,
    final int duration,
    final String replicaSetUrl
  ) {
    try (final MongoClient mongoReactiveClient = MongoClients.create(
      ConnectionUtil.getMongoClientSettingsWithTimeout(replicaSetUrl)
    )) {

      StepVerifier.create(mongoReactiveClient.getDatabase("admin").runCommand(
        new Document("setParameter", 1).append(param, duration)
      )).expectNextCount(1)
        .verifyComplete();
    }
  }
}

4. Как я могу играть с кодом?

силаев/wms

Малая WMS (система управления складом)

Предпосылка

  • Java 8
  • Докер (был протестирован в версии 18.09.2)

Статья о сообществе разработчиков

MongoDB тестовых контейнеров Модуль и весенние данные MongoDB в действии

Общая информация

Приложение позволяет:

  • создание новых продуктов с проверкой статьи (коллекция повторяющихся ключевых ошибок: wms.индекс продукта: статья возникает при попытке вставить продукты с идентичным идентификатором);
  • поиск продуктов (представление пользователя) по названию или бренду (полное равенство, может быть смягчено в описании продукта с помощью clear DSL);
  • найти все продукты (представление сущности);
  • загрузите файл xlsx (другие форматы в настоящее время не поддерживаются), чтобы обновить текущее количество продукта, Сопоставление выполняется по артикулу и размеру. Приложение сообщает о несоответствиях между продуктами, которые должны быть исправлены, и MongoDB После загрузки файлы хранятся в папке storage.bulk-upload-путь/имя пользователя, а затем удаляются…

5. Что в этом для меня?

1) Контейнер MongoDB заботится о сложности инициализации набора реплик MongoDB, позволяя разработчику сосредоточиться на тестировании. Теперь мы можем просто сделать тестирование транзакций MongoDB частью нашего процесса CI/CD; 2) При обработке данных разумно отдавать предпочтение массовым методам MongoDB, уменьшая количество подпроцессов в flatMap методе Flux и, таким образом, избегать введения “Проблемы с запросом N+1”. Тем не менее, это также имеет свою цену, потому что здесь нам нужно собрать список UpdateOneModel и хранить его в памяти, не обладающей реактивной гибкостью; 3) Когда дело доходит до пропуска обработки, можно использовать onerrorresume вместо опасного Ошибка продолжается

Не ошибка как таковая, а обсуждение – это onErrorContinue() ((и связанный при остановке ошибки() ) правильное решение, а если нет, можно ли разработать ту же функциональность более очевидным способом?

Я продолжаю замечать, что люди спотыкаются об это, и я бы солгал, если бы сказал, что это также не застало меня врасплох несколько раз. Есть две основные проблемы, которые я продолжаю видеть с onError Continue() .

В названии или Javadoc нет намека на то, что это специализированный оператор, который следует использовать с осторожностью – похоже, он аналогичен onErrorResume() . Имя кажется очень “дружелюбным” – как в “эй, это имя похоже на то, что я хочу сделать, поэтому я буду его использовать!” Это в отличие от (например) block() , что делает очевидным, что это не то, что вы хотите делать.

Javadoc тоже не очень помогает – под диаграммой есть только небольшая заметка о том, что “этот режим обработки ошибок не обязательно реализуется всеми операторами”. Поэтому я по понятным причинам видел, как люди использовали onError Continue() в полном неведении о том, что это специализированный оператор, которого они должны использовать с осторожностью ( onErrorResume() был бы лучшим выбором почти во всех этих случаях.)

Однако я думаю, что проблема кроется глубже, чем вышесказанное. Даже для признанного “специализированного” оператора, которого необходимо использовать с осторожностью, и даже для тех, кто разумно разбирается в реакторе, это все еще делает поток невероятно сложным для понимания в целом ряде сценариев:

Проглатывание неожиданных исключений

Есть определенные исключения (или исключения, которые встречаются в определенных местах), которые вы не обязательно ожидаете, что он “проглотит”, но это так (например, повторите исключения, согласно этой проблеме , поднятой некоторое время назад.)

Зайдя “слишком далеко” вверх по цепочке

Этот немного свободнее, но авторы библиотек возвращают Поток часто предполагает, в соответствии со спецификацией реактивных потоков, что ошибка всегда приведет к остановке потока , где они могут либо позволить оператору распространяться, либо возобновить, чтобы предпринять корректирующие действия. Если кто-то вызвал onError Continue() вниз по течению, это может привести к искажению кода библиотеки так, как автор не предполагал, что приведет к неожиданным последствиям.

Это, конечно, может быть исправлено всеми авторами библиотек, добавляющими onErrorStop() каждому издателю в конце цепочки, но на самом деле, похоже, никто этого не делает.

Отсутствие цепочки операторов

Поскольку onError Continue() влияет только на операторов выше и не связывается с другими onError Continue() вызовами, вы сталкиваетесь с нелогичными ситуациями, подобными этой:

    Flux.range(1,10).flatMap(x -> Mono.error(new RuntimeException()))
            .onErrorContinue(IllegalArgumentException.class, (t,o)-> System.out.println("A")) //Not called
            .onErrorContinue(RuntimeException.class, (t,o)-> System.out.println("B")) //Not called

…что, конечно, не приведет к выполнению ни одного из этих блоков, так как первым объявлен блок IllegalArgumentException .

Непоследовательное продолжение с одним значением

Если Поток содержит только одно значение с помощью Flux.just() , продолжение, похоже, не всегда работает надежно:

    Flux.just(1,1)
            .flatMap(ctx -> Flux.push((sink)->{ throw new RuntimeException();}))
            .onErrorContinue((ex, obj)->{
                System.err.println("Caught"); //Called, prints twice
            })

…но:

    Flux.just(1)
            .flatMap(ctx -> Flux.push((sink)->{ throw new RuntimeException();}))
            .onErrorContinue((ex, obj)->{
                System.err.println("Caught"); //Not called
            })

…но:

    Flux.just(1)
            .flatMap(x -> Mono.error(new RuntimeException()))
            .onErrorContinue((ex, obj)->{
                System.err.println("Caught"); //Called
            })

(Я не могу понять, что является причиной этого – это больше похоже на ошибку, чем ожидаемое поведение.)

Неясная “причина” – внутренняя или внешняя?

Объект причины не обязательно однозначен – в чем здесь причина? Я видел случаи, когда люди ожидали, что это будет 1 (объект верхнего уровня), но, конечно, это на самом деле A (внутренний объект.)

    Flux.just(1)
            .flatMap(i -> Flux.just("A").flatMap(x -> Mono.error(new IllegalArgumentException())))
            .onErrorContinue((ex, obj) -> {
                System.out.println(obj); //A
            })

Неясная “причина” № 2

… но как быть, когда нет никакой “причины”?

    Flux.just(1,1)
            .flatMap(i -> Flux.push((sink)->{ throw new IllegalArgumentException("bum!");}))
            .onErrorContinue((ex, obj) -> {
                System.out.println(obj); //null
            })

…это null , который следует “внутреннему” шаблону, описанному выше, но это не обязательно очевидно – Я видел некоторых, кто ожидал бы, что он вернется к внешней причине (i), а не будет нулевым.

Я уверен, что существует множество других неочевидных ситуаций поведения – и я намеренно опускаю более “очевидные” применимые предостережения (например, поддержка операторов должна быть явной, она влияет на поток выше, а не ниже его и т. Д.). Я хочу сказать, что, хотя большинство операторов достаточно просты в рассуждениях, onErrorContinue() является большим исключением из этого правила, которое может сделать написание и поддержку реактивного кода намного сложнее, чем это должно быть.

На самом деле я не уверен в этом. Изменения идут от “ничего” до “легкого прикосновения” и “как-то перепроектировать”. Некоторые возможные решения могут быть:

  • В основном оставляйте все как есть, но в Javadoc гораздо яснее, что onErrorContinue() является специализированным оператором, которого следует избегать везде, где это возможно;
  • Переименуйте метод во что-то более “вызывающее предупреждение”, например, игнорируйте ошибки восходящего потока, где это возможно() (просто пример “без обиняков”)
  • Сделайте что-нибудь с типами – может быть:
    • Ввести другой тип Поток который поддерживает onError Continue() , и этот тип работает только с поддерживаемыми операторами;
    • Введите тип, который вызывает только один “связанный” Ошибка onError Продолжается во всей цепочке во время компиляции (можно преобразовать в тип с помощью onError Continue() и обратно с помощью onErrorStop()
  • Представьте более перегруженные методы для поддерживаемых операторов, которые используют “обработчик ошибок” для продолжения, а не устанавливают его глобально в цепочке;
  • Удалить onError Continue() полностью (я подозреваю, что это невозможно, есть ли ситуации, когда такое поведение просто невозможно имитировать с помощью других операторов?)

Спасибо, что прочитали эту болтовню – было бы интересно услышать чьи-нибудь мысли…!

6. Хотите пойти глубже?

Чтобы создать набор реплик MongoDB с несколькими узлами для тестирования сложных случаев отработки отказа, рассмотрите:

силаев/mongodb-набор реплик

Набор реплик Java 8 MongoDB для создания полнофункционального кластера MongoDB для интеграционного тестирования, воспроизведения производственных проблем, изучения распределенных систем на примере MongoDB

Предпосылка

  • Java 8+

  • Рабочий стол Docker

  • На диаграмме показана локальная и удаленная поддержка докеров для набора номеров реплик

    1
    от 2 до 7 (включительно)

Совет Набор реплик с одним узлом является самым быстрым среди других. Это режим по умолчанию для набора реплик MongoDB, однако, чтобы использовать только его, рассмотрите модуль Test containers MongoDB на GitHub

Получаю это

  • Сортировка:
dependencies {
    testCompile("com.github.silaev:mongodb-replica-set:${LATEST_RELEASE}")
}
  • Знаток:
com.github.silaev
        mongodb-replica-set
        ${LATEST_RELEASE}

7. Связи

  1. Мастер-класс по реактивным транзакциям Майкла Саймонса и Марка Палуха
  2. Весенние данные MongoDB – Справочная документация
  3. Транзакции MongoDB
  4. Методы сбора MongoDB

Оригинал: “https://dev.to/silaev/the-testcontainers-mongodb-module-and-spring-data-mongodb-in-action-53ng”