Автор оригинала: Vlad Mihalcea.
Вступление
В текущем приложении, которое мы разрабатываем, был один случай использования, когда мы хотели синхронизировать обработку сообщений поставщиком сообщений (клиентом, генерирующим эти сообщения). Поток выглядит примерно так:
Таким образом, сообщения могут приходить случайным образом, поскольку параллельно выполняется больше заданий для клиентов, но мы хотим гарантировать, что сообщения, принадлежащие одному и тому же клиенту, обрабатываются одно за другим (аналогично уровню изоляции сериализуемой базы данных), позволяя при этом обрабатывать сообщения, поступающие от разных клиентов параллельно.
Синхронизация доступа
Итак, вот как выглядит механизм блокировки клиента:
/** * CustomerLockedExecution - Lock execution based for a given customer */ public class CustomerLockedExecution{ private Map lockMap = new HashMap (); private Lock getLock(K customerId) { ReentrantLock lock = lockMap.get(customerId); if (lock == null) { synchronized (this) { lock = lockMap.get(customerId); if (lock == null) { lock = new ReentrantLock(); lockMap.put(customerId, lock); } } } return lock; } /** * Lock on the customer and execute the specific logic * * @param customerId customer id * @param callable custom logic callback */ public void lockExecution(K customerId, Callable callable) { Lock lock = getLock(customerId); try { lock.lockInterruptibly(); callable.call(); } catch (Exception e) { throw new CallableException(e, callable); } finally { lock.unlock(); } } }
Время тестирования
Модульный тест запустит 10 потоков, каждый из которых имеет одинаковое значение CustomerID
, поэтому все они совпадают для выполнения своей логики, которая состоит из добавления 3 последовательных чисел (начиная с начального индекса) в общий буфер.
private CustomerLockedExecutionexecution = new CustomerLockedExecution<>(); private CopyOnWriteArrayList buffer = new CopyOnWriteArrayList<>(); private static final int appendTries = 3; private final int threadCount = 10; private ExecutorService executorService = Executors.newFixedThreadPool(threadCount); @Test public void testAwaitExecutionForSameIntegratedSource() throws InterruptedException { final CountDownLatch startLatch = new CountDownLatch(threadCount + 1); final CountDownLatch endLatch = new CountDownLatch(threadCount + 1); for (long i = 0; i < threadCount; i++) { final long index = i * threadCount; LOG.info("Scheduling thread index {}", index); executorService.submit(() -> { try { startLatch.countDown(); startLatch.await(); execution.lockExecution( 0L, () -> { LOG.info("Running thread index {}", index); for (int j = 0; j < appendTries; j++) { long number = index + j; LOG.info("Adding {}", number); buffer.add(number); } return null; } ); endLatch.countDown(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } startLatch.countDown(); LOG.info("Waiting for threads to be done"); endLatch.countDown(); endLatch.await(); LOG.info("Threads are done processing"); for (int i = 0; i < threadCount; i += appendTries) { long reference = buffer.get(i); for (int j = 0; j < appendTries; j++) { assertEquals(reference + j, (long) buffer.get(i + j)); } } }
При выполнении приведенного выше тестового примера мы получаем следующий результат:
Scheduling thread index 0 Scheduling thread index 10 Scheduling thread index 20 Scheduling thread index 30 Scheduling thread index 40 Scheduling thread index 50 Scheduling thread index 60 Scheduling thread index 70 Scheduling thread index 80 Scheduling thread index 90 Waiting for threads to be done Running thread index 0 Adding 0 Adding 1 Adding 2 Running thread index 80 Adding 80 Adding 81 Adding 82 Running thread index 30 Adding 30 Adding 31 Adding 32 Running thread index 40 Adding 40 Adding 41 Adding 42 Running thread index 60 Adding 60 Adding 61 Adding 62 Running thread index 50 Adding 50 Adding 51 Adding 52 Running thread index 10 Adding 10 Adding 11 Adding 12 Running thread index 90 Adding 90 Adding 91 Adding 92 Running thread index 20 Adding 20 Adding 21 Adding 22 Running thread index 70 Adding 70 Adding 71 Adding 72 Threads are done processing
Как вы можете видеть, каждый поток выполняется случайным образом, даже если все они запланированы для запуска одновременно, и между ними нет перекрывающихся чисел, поэтому каждый поток добавляет свои три числа, не чередуя добавление с каким-либо другим потоком.
Предотвращение взаимоблокировок
Вы должны знать о взаимоблокировках, так как мы удерживаем блокировку при выполнении определенной логики, вызывающей какой-либо не закрытый метод, и эта конкретная вызываемая логика также может получить какую-либо другую блокировку.
К счастью, это не наш случай, так как наш конвейер сообщений идет от одного конца к другому, поэтому существует только один способ ввода этой логики обработки.
В любом случае, когда приобретается несколько блокировок (например, A, B и C), обязательно всегда приобретать эти блокировки в одном и том же порядке:
- A -> B -> C и A -> B
Комбинации, такие как:
- A -> B и B -> A
- A -> B -> C и C -> B -> A
запрещены, так как они могут оказаться в тупике.
Вывод
Кроме того, я всегда стараюсь избегать вызова внешнего API при удержании блокировки, так как они могут оказаться медленными (длительный вызов веб-службы обработки), что может повлиять на масштабируемость нашей обработки, поскольку блокировка будет сохраняться в течение длительного времени.
Но внешние вызовы API также могут получать блокировки, о которых мы не знаем, что увеличивает вероятность взаимоблокировки, если мы случайно блокируем те же объекты, что и внешний API.