В этой статье дается краткий обзор Project Loom – одного из модных словечек в мире Java.
О чем проект Loom?
Проект Loom стартовал в конце 2017 года. Основная цель проекта – снизить сложность создания и обслуживания высокопроизводительных параллельных приложений. В нем представлена концепция облегченной модели параллелизма, основанной на виртуальных потоках. Что такое виртуальный поток? Виртуальный поток вместо того, чтобы управляться операционной системой, как стандартный, запланирован виртуальной машиной Java. Это приводит к тому, что такие потоки могут быть эффективно запланированы, позволяя выполнять синхронный код, а также асинхронный код с точки зрения производительности. Реализация основана на такой идее, как продолжение и связанные с ними операции, определяемые как парковка и распаковка.
Если вы хотите получить больше информации об этой идее, я настоятельно рекомендую ознакомиться с вики-сайтом проекта .
Демонстрационное время
Примечание: Статья основана на JDK из проекта Loom Сборка раннего доступа – Сборка 16-ткацкий станок+7-285 (2020/11/4)
Предположим, у нас есть несколько трудоемких задач, которые мы хотим выполнить в фоновом режиме нашего приложения.
private static Runnable timeConsumingTask(int id) { return () -> { out.println(format("[%s][%s] Starting time consuming task [id=%s]", now(), Thread.currentThread(), id)); try { Thread.sleep(Duration.ofSeconds(5)); } catch (InterruptedException e) { out.println(format("[%s][%s] Oops interruption occurred [id=%s]!", now(), Thread.currentThread(), id)); } out.println(format("[%s][%s] Ended time consuming task [id=%s]", now(), Thread.currentThread(), id)); }; }
Как вы можете видеть, определение задачи смехотворно просто: запуск журнала, сон в течение 5 секунд, завершение задачи в журнале.
Давайте начнем с “классического” потока демона. Мы будем использовать для этого ExecutorService
, который известен со времен JDK 1.5.
private static ExecutorService standardSingleExecutorService() { var factory = Thread.builder().name("standard-thread").daemon(true).factory(); return Executors.newSingleThreadExecutor(factory); }
Прежде всего, мы создаем фабрику потоков с установкой параметра daemon в значение true, а затем экземпляр исполнителя одного потока, который будет использовать эту фабрику. Далее наша главная цель – выполнить четыре “трудоемкие” задачи с использованием созданного исполнителя.
public static void main(String[] args) throws InterruptedException { var ex = standardSingleExecutorService(); long startTime = System.nanoTime(); IntStream.range(0, 4) .forEach(id -> ex.execute(timeConsumingTask(id))); ex.shutdown(); ex.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); long stopTime = System.nanoTime(); out.println(format("[%s][%s] Processing took = %s ms", now(), Thread.currentThread(), TimeUnit.MILLISECONDS.convert(stopTime - startTime, TimeUnit.NANOSECONDS))); }
Результат выглядит следующим образом:
[2020-11-22T17:15:54.303382][Thread[standard-thread,5,main]] Starting time consuming task [id=0] [2020-11-22T17:15:59.323550][Thread[standard-thread,5,main]] Ended time consuming task [id=0] [2020-11-22T17:15:59.324083][Thread[standard-thread,5,main]] Starting time consuming task [id=1] [2020-11-22T17:16:04.324808][Thread[standard-thread,5,main]] Ended time consuming task [id=1] [2020-11-22T17:16:04.325580][Thread[standard-thread,5,main]] Starting time consuming task [id=2] [2020-11-22T17:16:09.330546][Thread[standard-thread,5,main]] Ended time consuming task [id=2] [2020-11-22T17:16:09.331408][Thread[standard-thread,5,main]] Starting time consuming task [id=3] [2020-11-22T17:16:14.335599][Thread[standard-thread,5,main]] Ended time consuming task [id=3] [2020-11-22T17:16:14.336839][Thread[main,5,main]] Processing took = 20064 ms
Все работает так, как вы, вероятно, ожидали. Наши четыре задачи выполнялись одна за другой в пуле с одним потоком. Это приводит к тому, что общее время, необходимое для выполнения, составляет около 20 секунд (4 задачи, каждая по 5 секунд Thread.sleep()
) Да! Математика все еще работает, 4 умножить на 5 равно 20.
Теперь давайте изменим наш executorservice на использование виртуальных потоков. Это изменение довольно простое. Чтобы быть уверенным, что наш виртуальный поток запланирован одним потоком-носителем, мы можем назначить конкретный поток, указав его в методе virtual()
. Поток-носитель – это имя рабочего потока планировщика, который отвечает за выполнение виртуального потока. Смотрите пример ниже:
private static ExecutorService virtualThreadExecutorService() { var factory = Thread.builder().name("carrier").daemon(true).factory(); var executor = Executors.newSingleThreadExecutor(factory); var virtualThreadFactory = Thread.builder().name("virtual-thread", 0).virtual(executor).factory(); return Executors.newThreadExecutor(virtualThreadFactory); }
Теперь пришло время изменить код основного метода, чтобы использовать ExecutorService на основе виртуальных потоков:
var executorService = virtualThreadExecutorService();
и результат должен быть похож на:
[2020-11-22T17:16:49.114727][VirtualThread[virtual-thread0,carrier,main]] Starting time consuming task [id=0] [2020-11-22T17:16:49.161798][VirtualThread[virtual-thread1,carrier,main]] Starting time consuming task [id=1] [2020-11-22T17:16:49.162620][VirtualThread[virtual-thread2,carrier,main]] Starting time consuming task [id=2] [2020-11-22T17:16:49.163415][VirtualThread[virtual-thread3,carrier,main]] Starting time consuming task [id=3] [2020-11-22T17:16:54.163348][VirtualThread[virtual-thread0,carrier,main]] Ended time consuming task [id=0] [2020-11-22T17:16:54.164141][VirtualThread[virtual-thread1,carrier,main]] Ended time consuming task [id=1] [2020-11-22T17:16:54.165216][VirtualThread[virtual-thread2,carrier,main]] Ended time consuming task [id=2] [2020-11-22T17:16:54.166347][VirtualThread[virtual-thread3,carrier,main]] Ended time consuming task [id=3] [2020-11-22T17:16:54.166858][Thread[main,5,main]] Processing took = 5106 ms
Как вы можете видеть, выполнение одних и тех же четырех задач занимает около 5 секунд. Все задачи выполняются параллельно, не блокируя друг друга, используя данный поток-носитель. Мы добились этого, просто изменив тип потока со стандартного на виртуальный. Магия!
Ладно, круто… но как?
Чтобы понять, как это возможно, нам нужно погрузиться в метод Thread.sleep()
.
public static void sleep(Duration duration) throws InterruptedException { long nanos = duration.toNanos(); if (nanos < 0) return; Thread thread = currentThread(); if (thread.isVirtual()) { if (ThreadSleepEvent.isTurnedOn()) { ThreadSleepEvent event = new ThreadSleepEvent(); try { event.time = nanos; event.begin(); ((VirtualThread) thread).sleepNanos(nanos); } finally { event.commit(); } } else { ((VirtualThread) thread).sleepNanos(nanos); } } else { // convert to milliseconds, ceiling rounding mode long millis = MILLISECONDS.convert(nanos, NANOSECONDS); if (nanos > NANOSECONDS.convert(millis, MILLISECONDS)) { millis += 1L; } sleep(millis); } }
Как вы можете видеть, существует условный оператор, в котором реализация sleep ведет себя по-другому, когда выполняется в виртуальном потоке. Метод sleepNanos(длинные наносы)
из класса Виртуальный поток
дает нам ключ к разгадке.
void sleepNanos(long nanos) throws InterruptedException { ... while (remainingNanos > 0) { parkNanos(remainingNanos); if (getAndClearInterrupt()) { throw new InterruptedException(); } remainingNanos = nanos - (System.nanoTime() - startNanos); } ... }
Там есть строка, в которой говорится, что наш виртуальный поток будет припаркован ( Паркнанос(оставшиеся НАНО)
). Парковка виртуального потока означает предоставление его продолжения. Виртуальный поток припаркован на некоторое время? Давайте используем это время для других виртуальных потоков!
Наконец, углубляясь все глубже и глубже в реализацию операции парка, мы добрались до части кода, которая отвечает за планирование разблокировки данного виртуального потока. Распаковка виртуального потока приводит к тому, что его продолжение повторно отправляется планировщику. В нашем случае это означает, что после 5 секунд сна наш виртуальный поток может быть продолжен и может распечатать завершающий журнал.
/** * Schedules this thread to be unparked after the given delay. */ @ChangesCurrentThread private Future> scheduleUnpark(long nanos) { //assert Thread.currentThread() == this; Thread carrier = this.carrierThread; // need to switch to carrier thread to avoid nested parking carrier.setCurrentThread(carrier); try { return UNPARKER.schedule(this::unpark, nanos, NANOSECONDS); } finally { carrier.setCurrentThread(this); } }
РАСПАКОВЩИК
по умолчанию является ScheduledExecutorService
, который создается для целей виртуальных потоков.
private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
Этот пример был основан на анализе метода Thread.sleep()
, но также другие методы блокировки из разных библиотек были оптимизированы для использования виртуальными потоками. Список методов “дружественных к виртуальным потокам” можно найти здесь .
Резюме
На мой взгляд, проект Loom и преимущества, которые он предоставляет, могут изменить правила игры в мире Java. Обеспечение легкого параллелизма, встроенного в стандартные библиотеки, и производительности асинхронного кода в синхронных реализациях может быть быстрым и простым способом повышения эффективности существующих систем. Мне любопытно, как проект изменит подход к параллелизму в Java и его влияние на популярные библиотеки и фреймворки.
Примеры кода можно найти на здесь .
Подписывайтесь на меня в Твиттере .
Оригинал: “https://dev.to/piotrekst/let-s-take-a-quick-look-at-project-loom-1d21”