Рубрики
Без рубрики

Весенние данные MongoDB Хвостовые курсоры

Изучите, как использовать MongoDB в качестве бесконечного потока данных, используя хвостовые курсоры с данными Spring.

Автор оригинала: Drazen Nikolic.

1. введение

В этом уроке мы обсудим, как использовать MongoDB в качестве бесконечного потока данных, используя хвостовые курсоры с Spring Data MongoDB .

2. Хвостовые курсоры

Когда мы выполняем запрос, драйвер базы данных открывает курсор для предоставления соответствующих документов. По умолчанию MongoDB автоматически закрывает курсор, когда клиент считывает все результаты. Таким образом, поворот приводит к конечному потоку данных.

Однако мы можем использовать ограниченные коллекции с хвостовым курсором, который остается открытым даже после того, как клиент израсходовал все первоначально возвращенные данные, создавая бесконечный поток данных. Этот подход полезен для приложений, работающих с потоками событий, такими как сообщения чата или обновления акций.

Проект Spring Data MongoDB помогает нам использовать возможности реактивной базы данных, включая хвостовые курсоры.

3. Настройка

Чтобы продемонстрировать упомянутые функции, мы реализуем простое приложение счетчика журналов. Давайте предположим, что существует какой – то агрегатор журналов, который собирает и сохраняет все журналы в центральном месте-нашей коллекции MongoDB.

Во-первых, мы будем использовать простую сущность Log :

@Document
public class Log {
    private @Id String id;
    private String service;
    private LogLevel level;
    private String message;
}

Во-вторых, мы будем хранить журналы в нашей закрытой коллекции MongoDB. Ограниченные коллекции -это коллекции фиксированного размера, которые вставляют и извлекают документы в соответствии с порядком вставки. Мы можем создать их с помощью MongoOperations.createCollection :

db.createCollection(COLLECTION_NAME, new CreateCollectionOptions()
  .capped(true)
  .sizeInBytes(1024)
  .maxDocuments(5));

Для закрытых коллекций мы должны определить свойство size В байтах . Кроме того, max Documents указывает максимальное количество документов, которые может иметь коллекция. Как только они будут достигнуты, старые документы будут удалены из коллекции.

В-третьих, мы будем использовать соответствующую зависимость Spring Boot starter :


    org.springframework.boot
    spring-boot-starter-data-mongodb-reactive
    2.2.2.RELEASE

4. Реактивные Хвостовые курсоры

Мы можем использовать хвостовые курсоры как с помощью императивного , так и с помощью реактивного API MongoDB. Настоятельно рекомендуется использовать реактивный вариант .

Давайте реализуем счетчик журналов уровня WARN , используя реактивный подход. Мы можем создавать бесконечные потоковые запросы с помощью операций ReactiveMongo.хвост метод.

Хвостовой курсор остается открытым и выдает данные – a Поток сущностей – по мере поступления новых документов в ограниченную коллекцию и соответствия запросу фильтра :

private Disposable subscription;

public WarnLogsCounter(ReactiveMongoOperations template) {
    Flux stream = template.tail(
      query(where("level").is(LogLevel.WARN)), 
      Log.class);
    subscription = stream.subscribe(logEntity -> 
      counter.incrementAndGet()
    );
}

Как только новый документ, имеющий уровень WARN log, сохранится в коллекции, подписчик (лямбда-выражение) увеличит счетчик.

Наконец, мы должны избавиться от подписки, чтобы закрыть поток:

public void close() {
    this.subscription.dispose();
}

Кроме того, обратите внимание, что хвостовые курсоры могут стать мертвыми или недействительными, если запрос изначально не возвращает совпадения. Другими словами, даже если новые сохраненные документы соответствуют запросу фильтра, подписчик не сможет их получить. Это известное ограничение хвостовых курсоров MongoDB. Мы должны убедиться, что в закрытой коллекции есть соответствующие документы, прежде чем создавать хвостовой курсор.

5. Хвостовые курсоры с реактивным репозиторием

Проекты Spring Data предлагают абстракцию репозитория для различных хранилищ данных, включая реактивные версии.

MongoDB не является исключением. Пожалуйста, ознакомьтесь со статьей Spring Data Reactive Repositories with MongoDB для получения более подробной информации.

Кроме того, реактивные репозитории MongoDB поддерживают бесконечные потоки, аннотируя метод запроса с помощью @Tailable . Мы можем аннотировать любой метод репозитория, возвращающий Поток или другие реактивные типы, способные испускать несколько элементов:

public interface LogsRepository extends ReactiveCrudRepository {
    @Tailable
    Flux findByLevel(LogLevel level);
}

Давайте посчитаем INFO журналы, используя этот метод хвостового репозитория:

private Disposable subscription;

public InfoLogsCounter(LogsRepository repository) {
    Flux stream = repository.findByLevel(LogLevel.INFO);
    this.subscription = stream.subscribe(logEntity -> 
      counter.incrementAndGet()
    );
}

Аналогично, что касается Warn Logs Counter , мы должны избавиться от подписки, чтобы закрыть поток:

public void close() {
    this.subscription.dispose();
}

6. Хвостовые курсоры с помощью списка сообщений

Тем не менее, если мы не можем использовать реактивный API, мы можем использовать концепцию обмена сообщениями Spring.

Во-первых, нам нужно создать MessageListenerContainer , который будет обрабатывать отправленные Запросы на подписку объекты. Асинхронный драйвер MongoDB создает длительную блокирующую задачу, которая прослушивает новые документы в закрытой коллекции.

Spring Data MongoDB поставляется с реализацией по умолчанию, способной создавать и выполнять Задачи экземпляры для TailableCursorRequest:

private String collectionName;
private MessageListenerContainer container;
private AtomicInteger counter = new AtomicInteger();

public ErrorLogsCounter(MongoTemplate mongoTemplate,
  String collectionName) {
    this.collectionName = collectionName;
    this.container = new DefaultMessageListenerContainer(mongoTemplate);

    container.start();
    TailableCursorRequest request = getTailableCursorRequest();
    container.register(request, Log.class);
}

private TailableCursorRequest getTailableCursorRequest() {
    MessageListener listener = message -> 
      counter.incrementAndGet();

    return TailableCursorRequest.builder()
      .collection(collectionName)
      .filter(query(where("level").is(LogLevel.ERROR)))
      .publishTo(listener)
      .build();
}

Tailable Cursor Request создает запрос, фильтрующий только журналы уровня ОШИБОК|/. Каждый соответствующий документ будет опубликован в MessageListener , который увеличит счетчик.

Обратите внимание, что нам все еще нужно убедиться, что первоначальный запрос возвращает некоторые результаты. В противном случае хвостовой курсор будет немедленно закрыт.

Кроме того, мы не должны забывать останавливать контейнер, как только он нам больше не понадобится:

public void close() {
    container.stop();
}

7. Заключение

Ограниченные коллекции MongoDB с хвостовыми курсорами помогают нам непрерывно получать информацию из базы данных. Мы можем запустить запрос, который будет продолжать давать результаты до тех пор, пока явно не будет закрыт. Spring Data MongoDB предлагает нам как блокирующий, так и реактивный способ использования хвостовых курсоров.

Исходный код полного примера доступен на GitHub .