Это быстрый и грязный пост, но я пообещал опубликовать все, что я исследую в Playtomic.
Мы обсуждали, как ограничить количество задач, которые могут быть поставлены в очередь, и ExecutorService. Мы пытались контролировать объем памяти, который может обрабатывать служба, чтобы избежать исключений outofmemoryexceptions. Эта служба принимает сообщения из темы Кафки и из API. Эти операции заканчиваются одной и той же внутренней логикой, которая является многопоточной.
Существует своего рода очередь, BlockingQueue, которая может ждать, пока место в очереди не освободится. Казалось бы, использование службы выполнения с блокирующей очередью будет ждать при отправке задачи до тех пор, пока эта очередь не заполнится. Но это не так, служба выполнения отклоняет задачу.
Вы знаете, что часы проб и ошибок могут сэкономить вам часы чтения руководства. Я с гордостью могу сказать, что на этот раз я прочитал руководство первым.
Этот тест показывает, что происходит:
public class BlockingQueueExecutorServiceTest { @Test public void submitTest() { // Worst case scenario: accept only 1 thread in the queue. int nThreads = 1; ExecutorService exService = new ThreadPoolExecutor( nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(nThreads)); // Full this with tasks for (int i = 0; i < 10000; ++i) { WaitingTask t = new WaitingTask(i); exService.submit(t); } } private static class WaitingTask implements Runnable { int index; public WaitingTask(int index) { this.index = index; } @Override public void run() { try { log.info("Running task {}", index); Thread.sleep(1000); } catch (InterruptedException e) { } } } }
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@550dbc7a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@4dbb42b7[Wrapped task = com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest$WaitingTask@66f57048]] rejected from java.util.concurrent.ThreadPoolExecutor@21282ed8[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0] at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055) at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355) at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118) at com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest.submitTest(BlockingQueueExecutorServiceTest.java:28) ... more boring stacktrace
Если вы хотите подождать, пока очередь не заполнится, вам необходимо предоставить обработчик отклоненного выполнения, который это делает. Например, Политика Блокировки вызывающего абонента Spring.
public class BlockingQueueExecutorServiceTest { @Test public void submitTest() { // Worst case scenario: accept only 1 thread in the queue. int nThreads = 1; CallerBlocksPolicy policy = new CallerBlocksPolicy(10000); // 10secs ExecutorService exService = new ThreadPoolExecutor( nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(nThreads), policy); // Full this with tasks for (int i = 0; i < 10000; ++i) { WaitingTask t = new WaitingTask(i); exService.submit(t); } } private static class WaitingTask implements Runnable { int index; public WaitingTask(int index) { this.index = index; } @Override public void run() { try { log.info("Running task {}", index); Thread.sleep(1000); } catch (InterruptedException e) { } } } }
И на этот раз мы получаем:
12:21:32.409 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 0 12:21:32.422 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Attempting to queue task execution for 10000 milliseconds 12:21:33.420 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 1 12:21:33.420 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Task execution queued 12:21:33.421 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Attempting to queue task execution for 10000 milliseconds 12:21:34.423 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 2
Заголовок: Заголовок:
Оригинал: “https://dev.to/playtomic/linkedblockingqueue-and-executorservice-1pc5”