Рубрики
Без рубрики

Руководство по ConcurrentSkipListMap

Краткое введение в карту ConcurrentSkipListMap Java с примером типичного использования

Автор оригинала: baeldung.

1. Обзор

В этой краткой статье мы рассмотрим класс ConcurrentSkipListMap из пакета java.util.concurrent .

Эта конструкция позволяет нам создавать потокобезопасную логику без блокировки. Это идеально подходит для проблем, когда мы хотим сделать неизменяемый снимок данных, в то время как другие потоки все еще вставляют данные в карту.

Мы будем решать проблему сортировки потока событий и получения моментального снимка событий, которые произошли за последние 60 секунд, используя эту конструкцию .

2. Логика сортировки потоков

Допустим, у нас есть поток событий, которые постоянно поступают из нескольких потоков. Мы должны иметь возможность принимать события за последние 60 секунд, а также события, которые старше 60 секунд.

Во-первых, давайте определим структуру наших данных о событиях:

public class Event {
    private ZonedDateTime eventTime;
    private String content;

    // standard constructors/getters
}

Мы хотим, чтобы наши события сортировались с помощью поля EventTime . Чтобы достичь этого с помощью ConcurrentSkipListMap, нам нужно передать Компаратор его конструктору при создании его экземпляра:

ConcurrentSkipListMap events
 = new ConcurrentSkipListMap<>(
 Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));

Мы будем сравнивать все прибывшие события, используя их временные метки. Мы используем метод comparing Long() и передаем функцию извлечения, которая может принимать метку времени long из ZonedDateTime.

Когда наши события прибывают, нам нужно только добавить их на карту с помощью метода put () . Обратите внимание, что этот метод не требует явной синхронизации:

public void acceptEvent(Event event) {
    events.put(event.getEventTime(), event.getContent());
}

ConcurrentSkipListMap будет обрабатывать сортировку этих событий, используя Компаратор , который был передан ему в конструкторе.

Наиболее заметными плюсами ConcurrentSkipListMap являются методы, которые могут сделать неизменяемый снимок его данных без блокировки. Чтобы получить все события, которые произошли в течение последней минуты, мы можем использовать метод tailMap() и передать время, из которого мы хотим получить элементы:

public ConcurrentNavigableMap getEventsFromLastMinute() {
    return events.tailMap(ZonedDateTime.now().minusMinutes(1));
}

Он вернет все события с последней минуты. Это будет неизменяемый моментальный снимок, и самое главное, что другие потоки записи могут добавлять новые события в ConcurrentSkipListMap без какой-либо необходимости делать явную блокировку.

Теперь мы можем получить все события, которые произошли позже этой минуты, используя метод headMap() :

public ConcurrentNavigableMap getEventsOlderThatOneMinute() {
    return events.headMap(ZonedDateTime.now().minusMinutes(1));
}

Это вернет неизменный снимок всех событий, которые старше одной минуты. Все вышеперечисленные методы принадлежат классу Event Window Sort , который мы будем использовать в следующем разделе.

3. Тестирование логики потока сортировки

После того, как мы реализовали нашу логику сортировки с помощью ConcurrentSkipListMap, теперь мы можем протестировать ее, создав два потока записи , которые будут отправлять по сто событий каждый:

ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;

Runnable producer = () -> IntStream
  .rangeClosed(0, 100)
  .forEach(index -> eventWindowSort.acceptEvent(
      new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString()))
  );

for (int i = 0; i < numberOfThreads; i++) {
    executorService.execute(producer);
}

Каждый поток вызывает метод accept Event () , отправляя события, которые имеют время события с этого момента до “сейчас минус сто секунд”.

В то же время мы можем вызвать метод get Events From Last Minute () , который вернет моментальный снимок событий, которые находятся в пределах одного минутного окна:

ConcurrentNavigableMap eventsFromLastMinute 
  = eventWindowSort.getEventsFromLastMinute();

Количество событий в событиях с последней минуты будет варьироваться в каждом тестовом запуске в зависимости от скорости, с которой потоки-производители будут отправлять события в EventWindowSort. Мы можем утверждать, что в возвращаемом моментальном снимке нет ни одного события старше одной минуты:

long eventsOlderThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertEquals(eventsOlderThanOneMinute, 0);

И что в моментальном снимке более нуля событий, которые находятся в пределах одного минутного окна:

long eventYoungerThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertTrue(eventYoungerThanOneMinute > 0);

Наш get Events From Last Minute() использует tailMap() внизу.

Давайте теперь проверим get События Старше Одной минуты () , которые используют метод headMap() из ConcurrentSkipListMap:

ConcurrentNavigableMap eventsFromLastMinute 
  = eventWindowSort.getEventsOlderThatOneMinute();

На этот раз мы получаем снимок событий, которые старше одной минуты. Мы можем утверждать, что таких событий больше нуля:

long eventsOlderThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertTrue(eventsOlderThanOneMinute > 0);

И далее, что нет ни одного события, которое произошло бы в последнюю минуту:

long eventYoungerThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertEquals(eventYoungerThanOneMinute, 0);

Самое важное, что следует отметить, это то, что мы можем сделать снимок данных, в то время как другие потоки все еще добавляют новые значения в ConcurrentSkipListMap.

4. Заключение

В этом кратком руководстве мы рассмотрели основы ConcurrentSkipListMap , а также некоторые практические примеры .

Мы использовали высокую производительность ConcurrentSkipListMap для реализации неблокирующего алгоритма, который может служить нам неизменным моментальным снимком данных, даже если одновременно несколько потоков обновляют карту.

Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub ; это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.