В этой статье дается краткий обзор Project Loom – одного из модных словечек в мире Java.
О чем проект Loom?
Проект Loom стартовал в конце 2017 года. Основная цель проекта – снизить сложность создания и обслуживания высокопроизводительных параллельных приложений. В нем представлена концепция облегченной модели параллелизма, основанной на виртуальных потоках. Что такое виртуальный поток? Виртуальный поток вместо того, чтобы управляться операционной системой, как стандартный, запланирован виртуальной машиной Java. Это приводит к тому, что такие потоки могут быть эффективно запланированы, позволяя выполнять синхронный код, а также асинхронный код с точки зрения производительности. Реализация основана на такой идее, как продолжение и связанные с ними операции, определяемые как парковка и распаковка.
Если вы хотите получить больше информации об этой идее, я настоятельно рекомендую ознакомиться с вики-сайтом проекта .
Демонстрационное время
Примечание: Статья основана на JDK из проекта Loom Сборка раннего доступа – Сборка 16-ткацкий станок+7-285 (2020/11/4)
Предположим, у нас есть несколько трудоемких задач, которые мы хотим выполнить в фоновом режиме нашего приложения.
private static Runnable timeConsumingTask(int id) {
return () -> {
out.println(format("[%s][%s] Starting time consuming task [id=%s]", now(), Thread.currentThread(), id));
try {
Thread.sleep(Duration.ofSeconds(5));
} catch (InterruptedException e) {
out.println(format("[%s][%s] Oops interruption occurred [id=%s]!", now(), Thread.currentThread(), id));
}
out.println(format("[%s][%s] Ended time consuming task [id=%s]", now(), Thread.currentThread(), id));
};
}
Как вы можете видеть, определение задачи смехотворно просто: запуск журнала, сон в течение 5 секунд, завершение задачи в журнале.
Давайте начнем с “классического” потока демона. Мы будем использовать для этого ExecutorService , который известен со времен JDK 1.5.
private static ExecutorService standardSingleExecutorService() {
var factory = Thread.builder().name("standard-thread").daemon(true).factory();
return Executors.newSingleThreadExecutor(factory);
}
Прежде всего, мы создаем фабрику потоков с установкой параметра daemon в значение true, а затем экземпляр исполнителя одного потока, который будет использовать эту фабрику. Далее наша главная цель – выполнить четыре “трудоемкие” задачи с использованием созданного исполнителя.
public static void main(String[] args) throws InterruptedException {
var ex = standardSingleExecutorService();
long startTime = System.nanoTime();
IntStream.range(0, 4)
.forEach(id -> ex.execute(timeConsumingTask(id)));
ex.shutdown();
ex.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
long stopTime = System.nanoTime();
out.println(format("[%s][%s] Processing took = %s ms", now(), Thread.currentThread(), TimeUnit.MILLISECONDS.convert(stopTime - startTime, TimeUnit.NANOSECONDS)));
}
Результат выглядит следующим образом:
[2020-11-22T17:15:54.303382][Thread[standard-thread,5,main]] Starting time consuming task [id=0] [2020-11-22T17:15:59.323550][Thread[standard-thread,5,main]] Ended time consuming task [id=0] [2020-11-22T17:15:59.324083][Thread[standard-thread,5,main]] Starting time consuming task [id=1] [2020-11-22T17:16:04.324808][Thread[standard-thread,5,main]] Ended time consuming task [id=1] [2020-11-22T17:16:04.325580][Thread[standard-thread,5,main]] Starting time consuming task [id=2] [2020-11-22T17:16:09.330546][Thread[standard-thread,5,main]] Ended time consuming task [id=2] [2020-11-22T17:16:09.331408][Thread[standard-thread,5,main]] Starting time consuming task [id=3] [2020-11-22T17:16:14.335599][Thread[standard-thread,5,main]] Ended time consuming task [id=3] [2020-11-22T17:16:14.336839][Thread[main,5,main]] Processing took = 20064 ms
Все работает так, как вы, вероятно, ожидали. Наши четыре задачи выполнялись одна за другой в пуле с одним потоком. Это приводит к тому, что общее время, необходимое для выполнения, составляет около 20 секунд (4 задачи, каждая по 5 секунд Thread.sleep() ) Да! Математика все еще работает, 4 умножить на 5 равно 20.
Теперь давайте изменим наш executorservice на использование виртуальных потоков. Это изменение довольно простое. Чтобы быть уверенным, что наш виртуальный поток запланирован одним потоком-носителем, мы можем назначить конкретный поток, указав его в методе virtual() . Поток-носитель – это имя рабочего потока планировщика, который отвечает за выполнение виртуального потока. Смотрите пример ниже:
private static ExecutorService virtualThreadExecutorService() {
var factory = Thread.builder().name("carrier").daemon(true).factory();
var executor = Executors.newSingleThreadExecutor(factory);
var virtualThreadFactory = Thread.builder().name("virtual-thread", 0).virtual(executor).factory();
return Executors.newThreadExecutor(virtualThreadFactory);
}
Теперь пришло время изменить код основного метода, чтобы использовать ExecutorService на основе виртуальных потоков:
var executorService = virtualThreadExecutorService();
и результат должен быть похож на:
[2020-11-22T17:16:49.114727][VirtualThread[virtual-thread0,carrier,main]] Starting time consuming task [id=0] [2020-11-22T17:16:49.161798][VirtualThread[virtual-thread1,carrier,main]] Starting time consuming task [id=1] [2020-11-22T17:16:49.162620][VirtualThread[virtual-thread2,carrier,main]] Starting time consuming task [id=2] [2020-11-22T17:16:49.163415][VirtualThread[virtual-thread3,carrier,main]] Starting time consuming task [id=3] [2020-11-22T17:16:54.163348][VirtualThread[virtual-thread0,carrier,main]] Ended time consuming task [id=0] [2020-11-22T17:16:54.164141][VirtualThread[virtual-thread1,carrier,main]] Ended time consuming task [id=1] [2020-11-22T17:16:54.165216][VirtualThread[virtual-thread2,carrier,main]] Ended time consuming task [id=2] [2020-11-22T17:16:54.166347][VirtualThread[virtual-thread3,carrier,main]] Ended time consuming task [id=3] [2020-11-22T17:16:54.166858][Thread[main,5,main]] Processing took = 5106 ms
Как вы можете видеть, выполнение одних и тех же четырех задач занимает около 5 секунд. Все задачи выполняются параллельно, не блокируя друг друга, используя данный поток-носитель. Мы добились этого, просто изменив тип потока со стандартного на виртуальный. Магия!
Ладно, круто… но как?
Чтобы понять, как это возможно, нам нужно погрузиться в метод Thread.sleep() .
public static void sleep(Duration duration) throws InterruptedException {
long nanos = duration.toNanos();
if (nanos < 0)
return;
Thread thread = currentThread();
if (thread.isVirtual()) {
if (ThreadSleepEvent.isTurnedOn()) {
ThreadSleepEvent event = new ThreadSleepEvent();
try {
event.time = nanos;
event.begin();
((VirtualThread) thread).sleepNanos(nanos);
} finally {
event.commit();
}
} else {
((VirtualThread) thread).sleepNanos(nanos);
}
} else {
// convert to milliseconds, ceiling rounding mode
long millis = MILLISECONDS.convert(nanos, NANOSECONDS);
if (nanos > NANOSECONDS.convert(millis, MILLISECONDS)) {
millis += 1L;
}
sleep(millis);
}
}
Как вы можете видеть, существует условный оператор, в котором реализация sleep ведет себя по-другому, когда выполняется в виртуальном потоке. Метод sleepNanos(длинные наносы) из класса Виртуальный поток дает нам ключ к разгадке.
void sleepNanos(long nanos) throws InterruptedException {
...
while (remainingNanos > 0) {
parkNanos(remainingNanos);
if (getAndClearInterrupt()) {
throw new InterruptedException();
}
remainingNanos = nanos - (System.nanoTime() - startNanos);
}
...
}
Там есть строка, в которой говорится, что наш виртуальный поток будет припаркован ( Паркнанос(оставшиеся НАНО) ). Парковка виртуального потока означает предоставление его продолжения. Виртуальный поток припаркован на некоторое время? Давайте используем это время для других виртуальных потоков!
Наконец, углубляясь все глубже и глубже в реализацию операции парка, мы добрались до части кода, которая отвечает за планирование разблокировки данного виртуального потока. Распаковка виртуального потока приводит к тому, что его продолжение повторно отправляется планировщику. В нашем случае это означает, что после 5 секунд сна наш виртуальный поток может быть продолжен и может распечатать завершающий журнал.
/**
* Schedules this thread to be unparked after the given delay.
*/
@ChangesCurrentThread
private Future> scheduleUnpark(long nanos) {
//assert Thread.currentThread() == this;
Thread carrier = this.carrierThread;
// need to switch to carrier thread to avoid nested parking
carrier.setCurrentThread(carrier);
try {
return UNPARKER.schedule(this::unpark, nanos, NANOSECONDS);
} finally {
carrier.setCurrentThread(this);
}
}
РАСПАКОВЩИК по умолчанию является ScheduledExecutorService , который создается для целей виртуальных потоков.
private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
Этот пример был основан на анализе метода Thread.sleep() , но также другие методы блокировки из разных библиотек были оптимизированы для использования виртуальными потоками. Список методов “дружественных к виртуальным потокам” можно найти здесь .
Резюме
На мой взгляд, проект Loom и преимущества, которые он предоставляет, могут изменить правила игры в мире Java. Обеспечение легкого параллелизма, встроенного в стандартные библиотеки, и производительности асинхронного кода в синхронных реализациях может быть быстрым и простым способом повышения эффективности существующих систем. Мне любопытно, как проект изменит подход к параллелизму в Java и его влияние на популярные библиотеки и фреймворки.
Примеры кода можно найти на здесь .
Подписывайтесь на меня в Твиттере .
Оригинал: “https://dev.to/piotrekst/let-s-take-a-quick-look-at-project-loom-1d21”