1. Обзор
В этой краткой статье мы рассмотрим класс ConcurrentSkipListMap из пакета java.util.concurrent .
Эта конструкция позволяет нам создавать потокобезопасную логику без блокировки. Это идеально подходит для проблем, когда мы хотим сделать неизменяемый снимок данных, в то время как другие потоки все еще вставляют данные в карту.
Мы будем решать проблему сортировки потока событий и получения моментального снимка событий, которые произошли за последние 60 секунд, используя эту конструкцию .
2. Логика сортировки потоков
Допустим, у нас есть поток событий, которые постоянно поступают из нескольких потоков. Мы должны иметь возможность принимать события за последние 60 секунд, а также события, которые старше 60 секунд.
Во-первых, давайте определим структуру наших данных о событиях:
public class Event { private ZonedDateTime eventTime; private String content; // standard constructors/getters }
Мы хотим, чтобы наши события сортировались с помощью поля EventTime . Чтобы достичь этого с помощью ConcurrentSkipListMap, нам нужно передать Компаратор его конструктору при создании его экземпляра:
ConcurrentSkipListMapevents = new ConcurrentSkipListMap<>( Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));
Мы будем сравнивать все прибывшие события, используя их временные метки. Мы используем метод comparing Long() и передаем функцию извлечения, которая может принимать метку времени long из ZonedDateTime.
Когда наши события прибывают, нам нужно только добавить их на карту с помощью метода put () . Обратите внимание, что этот метод не требует явной синхронизации:
public void acceptEvent(Event event) { events.put(event.getEventTime(), event.getContent()); }
ConcurrentSkipListMap будет обрабатывать сортировку этих событий, используя Компаратор , который был передан ему в конструкторе.
Наиболее заметными плюсами ConcurrentSkipListMap являются методы, которые могут сделать неизменяемый снимок его данных без блокировки. Чтобы получить все события, которые произошли в течение последней минуты, мы можем использовать метод tailMap() и передать время, из которого мы хотим получить элементы:
public ConcurrentNavigableMapgetEventsFromLastMinute() { return events.tailMap(ZonedDateTime.now().minusMinutes(1)); }
Он вернет все события с последней минуты. Это будет неизменяемый моментальный снимок, и самое главное, что другие потоки записи могут добавлять новые события в ConcurrentSkipListMap без какой-либо необходимости делать явную блокировку.
Теперь мы можем получить все события, которые произошли позже этой минуты, используя метод headMap() :
public ConcurrentNavigableMapgetEventsOlderThatOneMinute() { return events.headMap(ZonedDateTime.now().minusMinutes(1)); }
Это вернет неизменный снимок всех событий, которые старше одной минуты. Все вышеперечисленные методы принадлежат классу Event Window Sort , который мы будем использовать в следующем разделе.
3. Тестирование логики потока сортировки
После того, как мы реализовали нашу логику сортировки с помощью ConcurrentSkipListMap, теперь мы можем протестировать ее, создав два потока записи , которые будут отправлять по сто событий каждый:
ExecutorService executorService = Executors.newFixedThreadPool(3); EventWindowSort eventWindowSort = new EventWindowSort(); int numberOfThreads = 2; Runnable producer = () -> IntStream .rangeClosed(0, 100) .forEach(index -> eventWindowSort.acceptEvent( new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString())) ); for (int i = 0; i < numberOfThreads; i++) { executorService.execute(producer); }
Каждый поток вызывает метод accept Event () , отправляя события, которые имеют время события с этого момента до “сейчас минус сто секунд”.
В то же время мы можем вызвать метод get Events From Last Minute () , который вернет моментальный снимок событий, которые находятся в пределах одного минутного окна:
ConcurrentNavigableMapeventsFromLastMinute = eventWindowSort.getEventsFromLastMinute();
Количество событий в событиях с последней минуты будет варьироваться в каждом тестовом запуске в зависимости от скорости, с которой потоки-производители будут отправлять события в EventWindowSort. Мы можем утверждать, что в возвращаемом моментальном снимке нет ни одного события старше одной минуты:
long eventsOlderThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1))) .count(); assertEquals(eventsOlderThanOneMinute, 0);
И что в моментальном снимке более нуля событий, которые находятся в пределах одного минутного окна:
long eventYoungerThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1))) .count(); assertTrue(eventYoungerThanOneMinute > 0);
Наш get Events From Last Minute() использует tailMap() внизу.
Давайте теперь проверим get События Старше Одной минуты () , которые используют метод headMap() из ConcurrentSkipListMap:
ConcurrentNavigableMapeventsFromLastMinute = eventWindowSort.getEventsOlderThatOneMinute();
На этот раз мы получаем снимок событий, которые старше одной минуты. Мы можем утверждать, что таких событий больше нуля:
long eventsOlderThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1))) .count(); assertTrue(eventsOlderThanOneMinute > 0);
И далее, что нет ни одного события, которое произошло бы в последнюю минуту:
long eventYoungerThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1))) .count(); assertEquals(eventYoungerThanOneMinute, 0);
Самое важное, что следует отметить, это то, что мы можем сделать снимок данных, в то время как другие потоки все еще добавляют новые значения в ConcurrentSkipListMap.
4. Заключение
В этом кратком руководстве мы рассмотрели основы ConcurrentSkipListMap , а также некоторые практические примеры .
Мы использовали высокую производительность ConcurrentSkipListMap для реализации неблокирующего алгоритма, который может служить нам неизменным моментальным снимком данных, даже если одновременно несколько потоков обновляют карту.
Реализацию всех этих примеров и фрагментов кода можно найти в проекте GitHub ; это проект Maven, поэтому его должно быть легко импортировать и запускать как есть.