Это быстрый и грязный пост, но я пообещал опубликовать все, что я исследую в Playtomic.
Мы обсуждали, как ограничить количество задач, которые могут быть поставлены в очередь, и ExecutorService. Мы пытались контролировать объем памяти, который может обрабатывать служба, чтобы избежать исключений outofmemoryexceptions. Эта служба принимает сообщения из темы Кафки и из API. Эти операции заканчиваются одной и той же внутренней логикой, которая является многопоточной.
Существует своего рода очередь, BlockingQueue, которая может ждать, пока место в очереди не освободится. Казалось бы, использование службы выполнения с блокирующей очередью будет ждать при отправке задачи до тех пор, пока эта очередь не заполнится. Но это не так, служба выполнения отклоняет задачу.
Вы знаете, что часы проб и ошибок могут сэкономить вам часы чтения руководства. Я с гордостью могу сказать, что на этот раз я прочитал руководство первым.
Этот тест показывает, что происходит:
public class BlockingQueueExecutorServiceTest {
@Test
public void submitTest() {
// Worst case scenario: accept only 1 thread in the queue.
int nThreads = 1;
ExecutorService exService = new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(nThreads));
// Full this with tasks
for (int i = 0; i < 10000; ++i) {
WaitingTask t = new WaitingTask(i);
exService.submit(t);
}
}
private static class WaitingTask implements Runnable {
int index;
public WaitingTask(int index) {
this.index = index;
}
@Override
public void run() {
try {
log.info("Running task {}", index);
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@550dbc7a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@4dbb42b7[Wrapped task = com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest$WaitingTask@66f57048]] rejected from java.util.concurrent.ThreadPoolExecutor@21282ed8[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
at com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest.submitTest(BlockingQueueExecutorServiceTest.java:28)
... more boring stacktrace
Если вы хотите подождать, пока очередь не заполнится, вам необходимо предоставить обработчик отклоненного выполнения, который это делает. Например, Политика Блокировки вызывающего абонента Spring.
public class BlockingQueueExecutorServiceTest {
@Test
public void submitTest() {
// Worst case scenario: accept only 1 thread in the queue.
int nThreads = 1;
CallerBlocksPolicy policy = new CallerBlocksPolicy(10000); // 10secs
ExecutorService exService = new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(nThreads),
policy);
// Full this with tasks
for (int i = 0; i < 10000; ++i) {
WaitingTask t = new WaitingTask(i);
exService.submit(t);
}
}
private static class WaitingTask implements Runnable {
int index;
public WaitingTask(int index) {
this.index = index;
}
@Override
public void run() {
try {
log.info("Running task {}", index);
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
И на этот раз мы получаем:
12:21:32.409 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 0 12:21:32.422 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Attempting to queue task execution for 10000 milliseconds 12:21:33.420 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 1 12:21:33.420 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Task execution queued 12:21:33.421 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Attempting to queue task execution for 10000 milliseconds 12:21:34.423 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 2
Заголовок: Заголовок:
Оригинал: “https://dev.to/playtomic/linkedblockingqueue-and-executorservice-1pc5”