Рубрики
Без рубрики

Блокирующая очередь и служба исполнителей

Это быстрый и грязный пост, но я пообещал опубликовать все, что я исследую… Помеченный java, многопоточность.

Это быстрый и грязный пост, но я пообещал опубликовать все, что я исследую в Playtomic.

Мы обсуждали, как ограничить количество задач, которые могут быть поставлены в очередь, и ExecutorService. Мы пытались контролировать объем памяти, который может обрабатывать служба, чтобы избежать исключений outofmemoryexceptions. Эта служба принимает сообщения из темы Кафки и из API. Эти операции заканчиваются одной и той же внутренней логикой, которая является многопоточной.

Существует своего рода очередь, BlockingQueue, которая может ждать, пока место в очереди не освободится. Казалось бы, использование службы выполнения с блокирующей очередью будет ждать при отправке задачи до тех пор, пока эта очередь не заполнится. Но это не так, служба выполнения отклоняет задачу.

Вы знаете, что часы проб и ошибок могут сэкономить вам часы чтения руководства. Я с гордостью могу сказать, что на этот раз я прочитал руководство первым.

Этот тест показывает, что происходит:

public class BlockingQueueExecutorServiceTest {

    @Test
    public void submitTest() {
        // Worst case scenario: accept only 1 thread in the queue.
        int nThreads = 1;


        ExecutorService exService = new ThreadPoolExecutor(
            nThreads,
            nThreads,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(nThreads));


        // Full this with tasks
        for (int i = 0; i < 10000; ++i) {
            WaitingTask t = new WaitingTask(i);
            exService.submit(t);

        }
    }

    private static class WaitingTask implements Runnable {

        int index;
        public WaitingTask(int index) {
            this.index = index;
        }

        @Override
        public void run() {

            try {
                log.info("Running task {}", index);
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
        }
    }
}
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@550dbc7a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@4dbb42b7[Wrapped task = com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest$WaitingTask@66f57048]] rejected from java.util.concurrent.ThreadPoolExecutor@21282ed8[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]

    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
    at com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest.submitTest(BlockingQueueExecutorServiceTest.java:28)
... more boring stacktrace

Если вы хотите подождать, пока очередь не заполнится, вам необходимо предоставить обработчик отклоненного выполнения, который это делает. Например, Политика Блокировки вызывающего абонента Spring.

public class BlockingQueueExecutorServiceTest {

    @Test
    public void submitTest() {
        // Worst case scenario: accept only 1 thread in the queue.
        int nThreads = 1;

        CallerBlocksPolicy policy = new CallerBlocksPolicy(10000); // 10secs
        ExecutorService exService = new ThreadPoolExecutor(
            nThreads,
            nThreads,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(nThreads), 
            policy);


        // Full this with tasks
        for (int i = 0; i < 10000; ++i) {
            WaitingTask t = new WaitingTask(i);
            exService.submit(t);

        }
    }

    private static class WaitingTask implements Runnable {

        int index;
        public WaitingTask(int index) {
            this.index = index;
        }

        @Override
        public void run() {

            try {
                log.info("Running task {}", index);
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
        }
    }
}

И на этот раз мы получаем:

12:21:32.409 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 0
12:21:32.422 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Attempting to queue task execution for 10000 milliseconds
12:21:33.420 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 1
12:21:33.420 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Task execution queued
12:21:33.421 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Attempting to queue task execution for 10000 milliseconds
12:21:34.423 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 2

Заголовок: Заголовок:

Оригинал: “https://dev.to/playtomic/linkedblockingqueue-and-executorservice-1pc5”