1. Обзор
В этом кратком руководстве мы рассмотрим основы семафоров и мьютексов в Java.
2. Семафор
Мы начнем с java.util.concurrent.Semaphore. Мы можем использовать семафоры, чтобы ограничить количество параллельных потоков, обращающихся к определенному ресурсу.
В следующем примере мы реализуем простую очередь входа в систему, чтобы ограничить количество пользователей в системе:
class LoginQueueUsingSemaphore { private Semaphore semaphore; public LoginQueueUsingSemaphore(int slotLimit) { semaphore = new Semaphore(slotLimit); } boolean tryLogin() { return semaphore.tryAcquire(); } void logout() { semaphore.release(); } int availableSlots() { return semaphore.availablePermits(); } }
Обратите внимание, как мы использовали следующие методы:
- tryAcquire() – возвращает true, если разрешение доступно немедленно, и получает его в противном случае возвращает false, но acquire() получает разрешение и блокирует его до тех пор, пока оно не будет доступно
- release() – освободить разрешение
- availablePermits() – возвращает количество доступных текущих разрешений
Чтобы проверить нашу очередь входа в систему, мы сначала попытаемся достичь предела и проверим, будет ли заблокирована следующая попытка входа в систему:
@Test public void givenLoginQueue_whenReachLimit_thenBlocked() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); assertFalse(loginQueue.tryLogin()); }
Затем мы посмотрим, доступны ли какие-либо слоты после выхода из системы:
@Test public void givenLoginQueue_whenLogout_thenSlotsAvailable() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); loginQueue.logout(); assertTrue(loginQueue.availableSlots() > 0); assertTrue(loginQueue.tryLogin()); }
3. Синхронизированный семафор
Далее мы обсудим Apache Commons TimedSemaphore. TimedSemaphore разрешает ряд разрешений в качестве простого семафора, но в заданный период времени, после этого периода время сбрасывается, и все разрешения освобождаются.
Мы можем использовать TimedSemaphore для построения простой очереди задержки следующим образом:
class DelayQueueUsingTimedSemaphore { private TimedSemaphore semaphore; DelayQueueUsingTimedSemaphore(long period, int slotLimit) { semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit); } boolean tryAdd() { return semaphore.tryAcquire(); } int availableSlots() { return semaphore.getAvailablePermits(); } }
Когда мы используем очередь задержки с одной секундой в качестве периода времени и после использования всех слотов в течение одной секунды, ни один из них не должен быть доступен:
public void givenDelayQueue_whenReachLimit_thenBlocked() { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); assertFalse(delayQueue.tryAdd()); }
Но после сна в течение некоторого времени семафор должен сбросить и освободить разрешения :
@Test public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); Thread.sleep(1000); assertTrue(delayQueue.availableSlots() > 0); assertTrue(delayQueue.tryAdd()); }
4. Семафор против Мьютекс
Мьютекс действует аналогично двоичному семафору, мы можем использовать его для реализации взаимного исключения.
В следующем примере мы будем использовать простой двоичный семафор для построения счетчика:
class CounterUsingMutex { private Semaphore mutex; private int count; CounterUsingMutex() { mutex = new Semaphore(1); count = 0; } void increase() throws InterruptedException { mutex.acquire(); this.count = this.count + 1; Thread.sleep(1000); mutex.release(); } int getCount() { return this.count; } boolean hasQueuedThreads() { return mutex.hasQueuedThreads(); } }
Когда множество потоков попытаются получить доступ к счетчику одновременно, они просто будут заблокированы в очереди :
@Test public void whenMutexAndMultipleThreads_thenBlocked() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); }
Когда мы ждем, все потоки получат доступ к счетчику, и в очереди не останется ни одного потока:
@Test public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); Thread.sleep(5000); assertFalse(counter.hasQueuedThreads()); assertEquals(count, counter.getCount()); }
5. Заключение
В этой статье мы изучили основы семафоров в Java.
Как всегда, полный исходный код доступен на GitHub .