Рубрики
Без рубрики

Руководство по ConcurrentMap

Краткое и практическое руководство по ConcurrentMap в Java.

Автор оригинала: baeldung.

1. Обзор

Карты , естественно, являются одним из наиболее распространенных стилей коллекции Java.

И, что немаловажно, HashMap не является потокобезопасной реализацией, в то время как Hashtable обеспечивает потокобезопасность за счет синхронизации операций.

Несмотря на то, что Hashtable потокобезопасен, он не очень эффективен. Еще одна полностью синхронизированная Карта, | Коллекции.synchronizedMap, также не проявляет большой эффективности. Если мы хотим потокобезопасности с высокой пропускной способностью при высоком параллелизме, то эти реализации не подходят.

Чтобы решить эту проблему, фреймворк Java Collections Framework | представил ConcurrentMap в Java 1.5 .

Следующие обсуждения основаны на Java 1.8 .

2. ConcurrentMap

ConcurrentMap – это расширение интерфейса Map . Она направлена на то, чтобы обеспечить структуру и руководство для решения проблемы согласования пропускной способности с потокобезопасностью.

Переопределяя несколько методов интерфейса по умолчанию, ConcurrentMap дает рекомендации для допустимых реализаций, обеспечивающих потокобезопасность и согласованность атомарных операций с памятью.

Несколько реализаций по умолчанию переопределяются, отключая поддержку null key/value:

  • getOrDefault
  • инструкция foreach
  • replaceAll
  • computeIfAbsent
  • computeIfPresent
  • вычислять
  • поглощать

Следующие API также переопределяются для поддержки атомарности без реализации интерфейса по умолчанию:

  • путИфАбсент
  • удалить
  • replace(key, oldValue, newValue)
  • заменить(ключ, значение)

Остальные действия непосредственно наследуются с помощью в основном согласованного с Map .

3. ConcurrentHashMap

ConcurrentHashMap – это готовая реализация out-of-box ConcurrentMap .

Для повышения производительности он состоит из массива узлов в виде сегментов таблиц (раньше это были сегменты таблиц до Java 8 ) под капотом и в основном использует операции CAS во время обновления.

Ведра таблицы инициализируются лениво, при первой вставке. Каждое ведро может быть независимо заблокировано путем блокировки самого первого узла в ведре. Операции чтения не блокируются, а конфликты обновления сводятся к минимуму.

Количество требуемых сегментов зависит от количества потоков, обращающихся к таблице, так что выполняемое обновление для каждого сегмента будет не более одного большую часть времени.

До Java 8 , количество требуемых “сегментов” было относительно количества потоков, обращающихся к таблице, так что выполняемое обновление для каждого сегмента будет не более одного большую часть времени.

Вот почему конструкторы, по сравнению с HashMap , предоставляют дополнительный уровень параллелизма аргумент для управления количеством предполагаемых потоков для использования:

public ConcurrentHashMap(
public ConcurrentHashMap(
 int initialCapacity, float loadFactor, int concurrencyLevel)

Два других аргумента: initialCapacity и loadFactor работали совершенно так же, как и HashMap .

Однако, начиная с Java 8 , конструкторы присутствуют только для обратной совместимости: параметры могут влиять только на начальный размер карты .

3.1. Резьбобезопасность

ConcurrentMap гарантирует согласованность памяти при операциях ключ/значение в многопоточной среде.

Действия в потоке до размещения объекта в ConcurrentMap в качестве ключа или значения происходят-до действий, следующих за доступом или удалением этого объекта в другом потоке.

Чтобы подтвердить это, давайте посмотрим на случай несогласованности памяти:

@Test
public void givenHashMap_whenSumParallel_thenError() throws Exception {
    Map map = new HashMap<>();
    List sumList = parallelSum100(map, 100);

    assertNotEquals(1, sumList
      .stream()
      .distinct()
      .count());
    long wrongResultCount = sumList
      .stream()
      .filter(num -> num != 100)
      .count();
    
    assertTrue(wrongResultCount > 0);
}

private List parallelSum100(Map map, 
  int executionTimes) throws InterruptedException {
    List sumList = new ArrayList<>(1000);
    for (int i = 0; i < executionTimes; i++) {
        map.put("test", 0);
        ExecutorService executorService = 
          Executors.newFixedThreadPool(4);
        for (int j = 0; j < 10; j++) {
            executorService.execute(() -> {
                for (int k = 0; k < 10; k++)
                    map.computeIfPresent(
                      "test", 
                      (key, value) -> value + 1
                    );
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);
        sumList.add(map.get("test"));
    }
    return sumList;
}

Для каждого map.computeIfPresent действия параллельно HashMap не обеспечивает последовательного представления того, каким должно быть текущее целочисленное значение, что приводит к непоследовательным и нежелательным результатам.

Что касается ConcurrentHashMap , то мы можем получить последовательный и правильный результат:

@Test
public void givenConcurrentMap_whenSumParallel_thenCorrect() 
  throws Exception {
    Map map = new ConcurrentHashMap<>();
    List sumList = parallelSum100(map, 1000);

    assertEquals(1, sumList
      .stream()
      .distinct()
      .count());
    long wrongResultCount = sumList
      .stream()
      .filter(num -> num != 100)
      .count();
    
    assertEquals(0, wrongResultCount);
}

3.2. Нулевой Ключ/Значение

Большинство API s, предоставляемых ConcurrentMap не допускает null ключ или значение, например:

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutWithNullKey_thenThrowsNPE() {
    concurrentMap.put(null, new Object());
}

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutNullValue_thenThrowsNPE() {
    concurrentMap.put("test", null);
}

Однако для действий compute* и merge вычисленное значение может быть null , что указывает на то, что сопоставление ключ-значение удаляется , если оно присутствует, или остается отсутствующим, если ранее отсутствовало .

@Test
public void givenKeyPresent_whenComputeRemappingNull_thenMappingRemoved() {
    Object oldValue = new Object();
    concurrentMap.put("test", oldValue);
    concurrentMap.compute("test", (s, o) -> null);

    assertNull(concurrentMap.get("test"));
}

3.3. Поддержка потока

Java 8 также обеспечивает поддержку Stream в ConcurrentHashMap .

В отличие от большинства потоковых методов, объемные (последовательные и параллельные) операции позволяют безопасно выполнять параллельную модификацию. ConcurrentModificationException не будет выброшено, что также относится к его итераторам. Релевантные потокам, несколько для каждого* , search и reduce* методов также добавляются для поддержки более богатых операций обхода и сокращения карты.

3.4. Производительность

Под капотом/| ConcurrentHashMap несколько похож на HashMap , с доступом к данным и обновлением на основе хэш-таблицы (хотя и более сложной).

И, конечно же, ConcurrentHashMap должен давать гораздо лучшую производительность в большинстве параллельных случаев для поиска и обновления данных.

Давайте напишем быстрый микро-бенчмарк для get и put производительности и сравним его с Hashtable и Коллекциями.synchronizedMap , выполняющий обе операции по 500 000 раз в 4 потоках.

@Test
public void givenMaps_whenGetPut500KTimes_thenConcurrentMapFaster() 
  throws Exception {
    Map hashtable = new Hashtable<>();
    Map synchronizedHashMap = 
      Collections.synchronizedMap(new HashMap<>());
    Map concurrentHashMap = new ConcurrentHashMap<>();

    long hashtableAvgRuntime = timeElapseForGetPut(hashtable);
    long syncHashMapAvgRuntime = 
      timeElapseForGetPut(synchronizedHashMap);
    long concurrentHashMapAvgRuntime = 
      timeElapseForGetPut(concurrentHashMap);

    assertTrue(hashtableAvgRuntime > concurrentHashMapAvgRuntime);
    assertTrue(syncHashMapAvgRuntime > concurrentHashMapAvgRuntime);
}

private long timeElapseForGetPut(Map map) 
  throws InterruptedException {
    ExecutorService executorService = 
      Executors.newFixedThreadPool(4);
    long startTime = System.nanoTime();
    for (int i = 0; i < 4; i++) {
        executorService.execute(() -> {
            for (int j = 0; j < 500_000; j++) {
                int value = ThreadLocalRandom
                  .current()
                  .nextInt(10000);
                String key = String.valueOf(value);
                map.put(key, value);
                map.get(key);
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);
    return (System.nanoTime() - startTime) / 500_000;
}

Имейте в виду, что микро-бенчмарки рассматривают только один сценарий и не всегда хорошо отражают реальную производительность.

Тем не менее, в системе OS X со средней системой разработки мы видим средний результат выборки для 100 последовательных запусков (в наносекундах):

Hashtable: 1142.45
SynchronizedHashMap: 1273.89
ConcurrentHashMap: 230.2

В многопоточной среде, где ожидается , что несколько потоков получат доступ к общей карте , явно предпочтительнее использовать ConcurrentHashMap .

Однако, когда Map доступен только одному потоку, HashMap может быть лучшим выбором для его простоты и надежной производительности.

3.5. Подводные камни

Операции извлечения обычно не блокируются в ConcurrentHashMap и могут перекрываться с операциями обновления. Поэтому для повышения производительности они отражают только результаты самых последних завершенных операций обновления, как указано в официальном Javadoc .

Есть еще несколько фактов, которые следует иметь в виду:

  • результаты методов агрегированного состояния, включая size , isEmpty и containsValue , обычно полезны только в том случае, если карта не подвергается параллельным обновлениям в других потоках:
@Test
public void givenConcurrentMap_whenUpdatingAndGetSize_thenError() 
  throws InterruptedException {
    Runnable collectMapSizes = () -> {
        for (int i = 0; i < MAX_SIZE; i++) {
            mapSizes.add(concurrentMap.size());
        }
    };
    Runnable updateMapData = () -> {
        for (int i = 0; i < MAX_SIZE; i++) {
            concurrentMap.put(String.valueOf(i), i);
        }
    };
    executorService.execute(updateMapData);
    executorService.execute(collectMapSizes);
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);

    assertNotEquals(MAX_SIZE, mapSizes.get(MAX_SIZE - 1).intValue());
    assertEquals(MAX_SIZE, concurrentMap.size());
}

Если параллельные обновления находятся под строгим контролем, агрегатный статус по-прежнему будет надежным.

Хотя эти методы агрегированного состояния не гарантируют точности в реальном времени, они могут быть адекватны для целей мониторинга или оценки .

Обратите внимание , что использование size() of ConcurrentHashMap должно быть заменено на mapping Count () , поскольку последний метод возвращает long count, хотя в глубине души они основаны на одной и той же оценке.

  • hashCode matters : обратите внимание, что использование многих ключей с точно таким же hashCode () – это верный способ замедлить производительность любой хэш-таблицы.

Чтобы улучшить воздействие, когда ключи Сопоставимы , ConcurrentHashMap может использовать порядок сравнения между ключами, чтобы помочь разорвать связи. Тем не менее, мы должны избегать использования одного и того же hashCode() как можно больше.

  • итераторы предназначены только для использования в одном потоке, поскольку они обеспечивают слабую согласованность, а не быстрый обход ошибок, и они никогда не будут вызывать исключение ConcurrentModificationException.
  • начальная емкость таблицы по умолчанию равна 16, и она регулируется указанным уровнем параллелизма:
public ConcurrentHashMap(
  int initialCapacity, float loadFactor, int concurrencyLevel) {
 
    //...
    if (initialCapacity < concurrencyLevel) {
        initialCapacity = concurrencyLevel;
    }
    //...
}
  • внимание к функциям переназначения: хотя мы можем выполнять операции переназначения с помощью предоставленных методов compute и merge* , мы должны держать их быстрыми, короткими и простыми и сосредоточиться на текущем отображении, чтобы избежать неожиданной блокировки.
  • ключи в ConcurrentHashMap не находятся в отсортированном порядке, поэтому для случаев, когда требуется упорядочение, ConcurrentSkipListMap является подходящим выбором.

4. ConcurrentNavigableMap

Для случаев, когда требуется упорядочение ключей, мы можем использовать ConcurrentSkipListMap , параллельную версию TreeMap .

В качестве дополнения к ConcurrentMap , ConcurrentNavigableMap поддерживает полный порядок своих ключей (по умолчанию в порядке возрастания) и одновременно доступен для навигации. Методы, возвращающие представления карты, переопределяются для совместимости с параллелизмом:

  • subMap
  • тепловая карта
  • Хвостовая карта
  • subMap
  • тепловая карта
  • Хвостовая карта
  • Нисходящая карта

keySet() итераторы и сплитераторы представлений улучшены с помощью слабой согласованности памяти:

  • Навигационный набор
  • Набор ключей
  • Спускающийся кейсет

5. ConcurrentSkipListMap

Ранее мы рассмотрели интерфейс NavigableMap и его реализацию TreeMap . ConcurrentSkipListMap можно увидеть масштабируемую параллельную версию TreeMap .

На практике в Java нет параллельной реализации красно-черного дерева. Параллельный вариант Skip Lists реализован в ConcurrentSkipListMap , обеспечивая ожидаемую среднюю логарифмическую(n) временную стоимость для операций containsKey , get , put и remove и их вариантов.

В дополнение к функциям TreeMap операции вставки, удаления, обновления и доступа к ключам гарантируются потокобезопасностью. Вот сравнение с TreeMap при одновременной навигации:

@Test
public void givenSkipListMap_whenNavConcurrently_thenCountCorrect() 
  throws InterruptedException {
    NavigableMap skipListMap
      = new ConcurrentSkipListMap<>();
    int count = countMapElementByPollingFirstEntry(skipListMap, 10000, 4);
 
    assertEquals(10000 * 4, count);
}

@Test
public void givenTreeMap_whenNavConcurrently_thenCountError() 
  throws InterruptedException {
    NavigableMap treeMap = new TreeMap<>();
    int count = countMapElementByPollingFirstEntry(treeMap, 10000, 4);
 
    assertNotEquals(10000 * 4, count);
}

private int countMapElementByPollingFirstEntry(
  NavigableMap navigableMap, 
  int elementCount, 
  int concurrencyLevel) throws InterruptedException {
 
    for (int i = 0; i < elementCount * concurrencyLevel; i++) {
        navigableMap.put(i, i);
    }
    
    AtomicInteger counter = new AtomicInteger(0);
    ExecutorService executorService
      = Executors.newFixedThreadPool(concurrencyLevel);
    for (int j = 0; j < concurrencyLevel; j++) {
        executorService.execute(() -> {
            for (int i = 0; i < elementCount; i++) {
                if (navigableMap.pollFirstEntry() != null) {
                    counter.incrementAndGet();
                }
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);
    return counter.get();
}

Полное объяснение проблем производительности за кулисами выходит за рамки этой статьи. Подробности можно найти в Concurrentskiplistmap Javadoc, который находится под java/util/concurrent в src.zip файл.

6. Заключение

В этой статье мы в основном познакомились с интерфейсом ConcurrentMap и функциями ConcurrentHashMap , а также рассмотрели ConcurrentNavigableMap обязательный порядок ключей.

Полный исходный код всех примеров, использованных в этой статье, можно найти в проекте GitHub .