Рубрики
Без рубрики

Параллельный поток Java 8 с пулом потоков

При выполнении параллельного потока он выполняется в общем пуле forkJoin (ForkJoinPool.commonPool()), s… Помеченный как java, учебник, многопоточность, пул потоков.

При выполнении параллельного потока он выполняется в общем пуле ForkJoinPool ( ForkJoinPool.commonPool() ), общем для всех других параллельных потоков. Иногда мы хотим выполнять код параллельно в отдельном выделенном пуле потоков, построенном с определенным количеством потоков. При использовании, например, my Collection.parallel Stream() это не дает нам удобного способа сделать это. Я написал небольшую удобную утилиту ( Класс ThreadExecutor ), которую можно использовать для этой цели. В следующем примере я продемонстрирую простое использование утилиты ThreadExecutor для заполнения длинного массива вычисляемыми числами, каждое число вычисляется в потоке в пуле соединений с разветвлением (не в общем пуле). Создание пула потоков выполняется утилитой. Мы контролируем количество потоков в пуле ( параллелизм ), имена потоков в пуле (полезно при исследовании дампа потоков) и, при необходимости, ограничение времени ожидания. Я протестировал его с помощью JUnit 5, который обеспечивает хороший способ определения времени для методов тестирования (см. Расширение времени

Весь исходный код доступен на GitHub по адресу: Весь исходный код доступен на GitHub по адресу:

Класс утилиты ThreadExecutor

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.common.util.concurrent.UncheckedTimeoutException;

import java.time.Duration;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;

public class ThreadExecutor {
    public static  R execute(int parallelism, String forkJoinWorkerThreadName, T source, Function parallelStream) {
        return execute(parallelism, forkJoinWorkerThreadName, source, 0, null, parallelStream);
    }

    public static  R execute(int parallelism, String forkJoinWorkerThreadName, T source, long timeout, TimeUnit unit, Function parallelStream) {
        if (timeout < 0)
            throw new IllegalArgumentException("Invalid timeout " + timeout);
        // see java.util.concurrent.Executors.newWorkStealingPool(int parallelism)
        ExecutorService threadPool = new ForkJoinPool(parallelism, new NamedForkJoinWorkerThreadFactory(forkJoinWorkerThreadName), null, true);
        Future future = threadPool.submit(() -> parallelStream.apply(source));
        try {
            return timeout == 0 ? future.get() : future.get(timeout, unit);
        } catch (ExecutionException e) {
            future.cancel(true);
            threadPool.shutdownNow();
            Throwable cause = e.getCause();
            if (cause instanceof Error)
                throw new ExecutionError((Error) cause);
            throw new UncheckedExecutionException(cause);
        } catch (TimeoutException e) {
            future.cancel(true);
            threadPool.shutdownNow();
            throw new UncheckedTimeoutException(e);
        } catch (Throwable t) {
            future.cancel(true);
            threadPool.shutdownNow();
            Throwables.throwIfUnchecked(t);
            throw new RuntimeException(t);
        } finally {
            threadPool.shutdown();
        }
    }

    public static  void execute(int parallelism, String forkJoinWorkerThreadName, T source, Consumer parallelStream) {
        execute(parallelism, forkJoinWorkerThreadName, source, 0, null, parallelStream);
    }

    public static  void execute(int parallelism, String forkJoinWorkerThreadName, T source, long timeout, TimeUnit unit, Consumer parallelStream) {
        if (timeout < 0)
            throw new IllegalArgumentException("Invalid timeout " + timeout);
        // see java.util.concurrent.Executors.newWorkStealingPool(int parallelism)
        ExecutorService threadPool = new ForkJoinPool(parallelism, new NamedForkJoinWorkerThreadFactory(forkJoinWorkerThreadName), null, true);
        CompletableFuture future = null;
        try {
            Runnable task = () -> parallelStream.accept(source);
            if (timeout == 0) {
                future = CompletableFuture.runAsync(task, threadPool);
                future.get();
                threadPool.shutdown();
            } else {
                threadPool.execute(task);
                threadPool.shutdown();
                if (!threadPool.awaitTermination(timeout, unit))
                    throw new TimeoutException("Timed out after: " + Duration.of(timeout, unit.toChronoUnit()));
            }
        } catch (TimeoutException e) {
            threadPool.shutdownNow();
            throw new UncheckedTimeoutException(e);
        } catch (ExecutionException e) {
            future.cancel(true);
            threadPool.shutdownNow();
            Throwable cause = e.getCause();
            if (cause instanceof Error)
                throw new ExecutionError((Error) cause);
            throw new UncheckedExecutionException(cause);
        } catch (Throwable t) {
            threadPool.shutdownNow();
            Throwables.throwIfUnchecked(t);
            throw new RuntimeException(t);
        }
    }
}

Именованный класс ForkJoinWorkerThreadFactory

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.atomic.AtomicInteger;

public class NamedForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
    private AtomicInteger counter = new AtomicInteger(0);
    private final String name;
    private final boolean daemon;

    public NamedForkJoinWorkerThreadFactory(String name, boolean daemon) {
        this.name = name;
        this.daemon = daemon;
    }

    public NamedForkJoinWorkerThreadFactory(String name) {
        this(name, false);
    }

    @Override
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        ForkJoinWorkerThread t = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
        t.setName(name + counter.incrementAndGet());
        t.setDaemon(daemon);
        return t;
    }
}

ThreadExecutor Проверяет класс JUnit

import static org.junit.jupiter.api.Assertions.*;

import com.github.igalhaddad.threadexecutor.timing.TimingExtension;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Logger;
import java.util.stream.Collectors;

@ExtendWith(TimingExtension.class)
@TestMethodOrder(OrderAnnotation.class)
@DisplayName("Test ThreadExecutor utility")
public class ThreadExecutorTests {
    private static final Logger logger = Logger.getLogger(ThreadExecutorTests.class.getName());
    private static final int SEQUENCE_LENGTH = 1000000;

    private static List fibonacciSequences = new ArrayList<>();
    private long[] fibonacciSequence;

    @BeforeAll
    static void initAll() {
        logger.info(() -> "Number of available processors: " + Runtime.getRuntime().availableProcessors());
    }

    @BeforeEach
    void init() {
        this.fibonacciSequence = new long[SEQUENCE_LENGTH];
        fibonacciSequences.add(fibonacciSequence);
    }

    @AfterEach
    void tearDown() {
        int firstX = 10;
        logger.info(() -> "First " + firstX + " numbers: " + Arrays.stream(this.fibonacciSequence)
                .limit(firstX)
                .mapToObj(Long::toString)
                .collect(Collectors.joining(",", "[", ",...]")));
        int n = SEQUENCE_LENGTH - 1; // Last number
        assertFn(n);
        assertFn(n / 2);
        assertFn(n / 3);
        assertFn(n / 5);
        assertFn(n / 10);
        assertFn((n / 3) * 2);
        assertFn((n / 5) * 4);
    }

    private void assertFn(int n) {
        assertEquals(fibonacciSequence[n - 1] + fibonacciSequence[n - 2], fibonacciSequence[n]);
    }

    @AfterAll
    static void tearDownAll() {
        long[] fibonacciSequence = fibonacciSequences.iterator().next();
        for (int i = 1; i < fibonacciSequences.size(); i++) {
            assertArrayEquals(fibonacciSequence, fibonacciSequences.get(i));
        }
    }

    @Test
    @Order(1)
    @DisplayName("Calculate Fibonacci sequence sequentially")
    public void testSequential() {
        logger.info(() -> "Running sequentially. No parallelism");
        for (int i = 0; i < fibonacciSequence.length; i++) {
            fibonacciSequence[i] = Fibonacci.compute(i);
        }
    }

    @Test
    @Order(2)
    @DisplayName("Calculate Fibonacci sequence concurrently on all processors")
    public void testParallel1() {
        testParallel(Runtime.getRuntime().availableProcessors());
    }

    @Test
    @Order(3)
    @DisplayName("Calculate Fibonacci sequence concurrently on half of the processors")
    public void testParallel2() {
        testParallel(Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
    }

    private void testParallel(int parallelism) {
        logger.info(() -> String.format("Running in parallel on %d processors", parallelism));
        ThreadExecutor.execute(parallelism, "FibonacciTask", fibonacciSequence,
                (long[] fibonacciSequence) -> Arrays.parallelSetAll(fibonacciSequence, Fibonacci::compute)
        );
    }

    static class Fibonacci {
        public static long compute(int n) {
            if (n <= 1)
                return n;
            long a = 0, b = 1;
            long sum = a + b; // for n == 2
            for (int i = 3; i <= n; i++) {
                a = sum; // using `a` for temporary storage
                sum += b;
                b = a;
            }
            return sum;
        }
    }
}

Обратите внимание тест Параллельный (int параллелизм) метод. Этот метод использует утилиту ThreadexeCutor для выполнения параллельного потока в отдельном выделенном пуле потоков, состоящем из предусмотренного количества потоков, где каждый поток называется “Fibonaccitask”, объединенный с серийным номером, например, “FibonacciTask3”. Именованные потоки происходят из Именованного класса ForkJoinWorkerThreadFactory . Например, я приостановил метод тестирования test Parallel 2() с точкой останова в методе Fibonacci.compute , и я вижу 6 потоков с именем “FibonacciTask1-6”. Вот один из них:

“”Задача Фибоначчи 3@2715”, выполняемая на java.lang. Нить. Государство: работоспособный

  at com.github.igalhaddad.threadexecutor.util.ThreadExecutorTests$Fibonacci.compute(ThreadExecutorTests.java:103)
  at com.github.igalhaddad.threadexecutor.util.ThreadExecutorTests$$Lambda$366.1484420181.applyAsLong(Unknown Source:-1)
  at java.util.Arrays.lambda$parallelSetAll$2(Arrays.java:5408)
  at java.util.Arrays$$Lambda$367.864455139.accept(Unknown Source:-1)
  at java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:204)
  at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
  at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
  at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
  at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
  at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
  at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
  at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
  at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)

Метод test Parallel(int параллелизм) выполняет Arrays.parallelSetAll , который на самом деле является простым параллельным потоком, реализованным в исходном коде java:

    public static void parallelSetAll(long[] array, IntToLongFunction generator) {
        Objects.requireNonNull(generator);
        IntStream.range(0, array.length).parallel().forEach(i -> { array[i] = generator.applyAsLong(i); });
    }

Теперь давайте посмотрим на методы тестирования и сроки ⏱:

Как вы можете видеть на выходе:

  1. тест Последовательный() метод тестирования занял 148622 мс (без параллелизма).
  2. тест Параллельный 1() метод тестирования занял 16995 мс (12 процессоров параллельно).
  3. тест Параллельный 2() метод тестирования занял 31152 мс (6 процессоров параллельно).

Все три метода тестирования выполняли одну и ту же задачу по вычислению последовательности Фибоначчи длиной 1 000 000 чисел.

Оригинал: “https://dev.to/igalhaddad/java-8-parallel-stream-with-threadpool-32kd”