Многопоточность Java запрашивает операционную систему для создания и уничтожения потоков. Использование нескольких потоков для выполнения нескольких задач является эффективным способом использования ЦП и обеспечивает реальное повышение производительности приложения.
В этом сообщении в блоге объясняется процесс синхронного и асинхронного доступа к данным с помощью ThreadPoolExecutor . Основной мотив использования концепции пула потоков для доступа к данным заключается в повышении эффективности многопоточности и ограничении количества потоков для предотвращения перегрузки процессора. ThreadPoolExecutor заботится о создании и уничтожении потоков и блокировании задач хранения в очереди. Как только ThreadPoolExecutor будет доступен для выполнения большего количества задач, он будет выполнять задачи в порядке FIFO (Первый в первом исходе).
ExecutorService – это платформа, предоставляемая JDK, которая упрощает выполнение задач в асинхронном режиме. Пулы потоков решают две разные проблемы:
- Обычно обеспечивают повышенную производительность при выполнении большого количества асинхронных задач из-за снижения накладных расходов на вызов каждой задачи и
- И обеспечивают средства ограничения и управления ресурсами, включая потоки, потребляемыми при выполнении набора задач.
Есть несколько способов реализовать это. Один из них напрямую использует ExecutorService интерфейс и создать пул потоков . Во-вторых, используйте класс ThreadPoolExecutor, который является службой-исполнителем, которая выполняет каждую отправленную задачу, используя один из, возможно, нескольких объединенных потоков, обычно настраиваемых с помощью [Исполнители] (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html) заводские методы.
- Java 11
- Пружинный ботинок 2.x
- Весенние данные 2.x
- MySQL 8.x
- Весенняя загрузка и весенние данные, используемые для разработки простого REST API и вызовов репозитория доступа к данным, перенаправленных на ThreadPoolExecutor
- Создайте службы Employee, EmployeeController и Employee, как показано ниже
Создайте службы Employee, EmployeeController и Employee, как показано ниже
package com.pj.springdatademo.model;import lombok.Data;import javax.persistence.\*;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.io.Serializable;@Entity
@Data
@Table(name = "employee")
public class Employee implements Serializable
{
private static final long _serialVersionUID_ \= -2994315037642107537L;@Id
@GeneratedValue(strategy = GenerationType._SEQUENCE_)
@Column(name = "id")
private Long id;@NotNull(message = "First name must not be null")
@NotEmpty
@Column(name = "first\_name", nullable = false)
private String firstName;@NotNull(message = "Last name must not be null")
@NotEmpty
@Column(name = "last\_name", nullable = false)
private String lastName;@NotNull(message = "Email must not be null")
@NotEmpty
@Column(name = "email", nullable = false)
private String email;@Column(name = "phone")
private String phone;}
Создайте службы Employee, EmployeeController и Employee, как показано ниже
@RestController
@RequestMapping("/api/v1/employee")
public class EmployeeController
{
private final EmployeeService employeeService;public EmployeeController(EmployeeService employeeService)
{
this.employeeService \= employeeService;
}@GetMapping(path = "/list")
public List getAllEmployees()
{
return employeeService.getAllEmployees();
}@GetMapping(path = "/list/async")
public List getAllEmployeesAsync()
{
return employeeService.getAllEmployeesAsync();
}
}
Создайте службы Employee, EmployeeController и Employee, как показано ниже
@Service
public class EmployeeServiceImpl implements EmployeeService
{
private final EmployeeRepository employeeRepository;
private final ThreadPoolExecutorUtil threadPoolExecutorUtil;public EmployeeServiceImpl(EmployeeRepository employeeRepository, ThreadPoolExecutorUtil threadPoolExecutorUtil)
{
this.employeeRepository \= employeeRepository;
this.threadPoolExecutorUtil \= threadPoolExecutorUtil;
}@Override
public List getAllEmployeesAsync()
{
for (int i=0;i<10000;i++)
{
TaskThread taskThread=new TaskThread(employeeRepository);
threadPoolExecutorUtil.executeTask(taskThread);
}
_/\*
Following code created to just return list of values at the end
\*/_ TaskThread taskThread=new TaskThread(employeeRepository);
threadPoolExecutorUtil.executeTask(taskThread);return taskThread.employees;
}@Override
public List getAllEmployees()
{
return employeeRepository.findAll();
}
}
3. Класс обслуживания сотрудников имеет 2 метода. Метод getAllEmployees(), который не использует механизм пула потоков, и метод get All Employees Async(), который использует механизм пула потоков
4. Давайте создадим класс утилиты ThreadPoolExecutor, который позаботится о многопоточности (объяснено на следующем шаге)
@Component
public class ThreadPoolExecutorUtil
{
private Logger logger\= LoggerFactory._getLogger_(ThreadPoolExecutorUtil.class);private ThreadPoolExecutor threadPoolExecutor;public ThreadPoolExecutorUtil()
{
_//Handle 10000 tasks at a time_ BlockingQueue blockingQueue = new ArrayBlockingQueue(10000);
threadPoolExecutor \= new ThreadPoolExecutor(2, 10, 20, TimeUnit._SECONDS_, blockingQueue);
threadPoolExecutor.setRejectedExecutionHandler((r, executor) ->
{
try {
Thread._sleep_(1000);
logger.error("Exception occurred while adding task, Waiting for some time");
}
catch (InterruptedException e)
{
logger.error("Thread interrupted: ()",e.getCause());
Thread._currentThread_().interrupt();
}
threadPoolExecutor.execute(r);
});
}void executeTask(TaskThread taskThread)
{
Future> future=threadPoolExecutor.submit(taskThread);
logger.info("Number of Active Threads: {}",threadPoolExecutor.getActiveCount());while (!future.isDone())
{
try {
future.get();
logger.info("task.employees: {}",taskThread.employees.toString());
}
catch (Exception e)
{
logger.error("Thread interrupted: ()",e.getCause());
}
}
}
}
5. Давайте пройдем строчку за строчкой. Сначала конструктор, мы создаем блокирующую очередь, которая содержит 10000 задач одновременно. Вы можете увеличить или уменьшить это число, но помните, что оно потребляет много ресурсов, таких как память и процессор, для выполнения большого количества задач.
_//Handle 10000 tasks at a time _BlockingQueueblockingQueue = new ArrayBlockingQueue(10000);
6. Создайте ThreadPoolExecutor объект, передав значения corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit и имя очереди блокировки
threadPoolExecutor \= new ThreadPoolExecutor(2, 10, 20, TimeUnit._SECONDS_, blockingQueue);
7. Установить setRejectedExecutionHandler метод для обработки отклонений, если очередь заполнена или задачи не могут быть добавлены в очередь. Установите период ожидания для потока до тех пор, пока один из потоков из пула не освободится, а затем добавьте его ThreadPoolExecutor с помощью метода execuet()
threadPoolExecutor.setRejectedExecutionHandler((r, executor) ->
{
try {
Thread._sleep_(1000);
logger.error("Exception occurred while adding task, Waiting for some time");
}
catch (InterruptedException e)
{
logger.error("Thread interrupted: ()",e.getCause());
Thread._currentThread_().interrupt();
}
threadPoolExecutor.execute(r);
});
8. Создать выполнить задачу (taskthread taskthread) метод, который выполняет задачу из BlockingQueue. Отправьте задачу в ThreadPoolExecutor и скопируйте результат в Будущее объект. Теперь задачи выполняются асинхронно
void executeTask(TaskThread taskThread)
{
Future> future=threadPoolExecutor.submit(taskThread);
System._out_.println("Queue Size: "+threadPoolExecutor.getQueue().size());
System._out_.println("Number of Active Threads: "+threadPoolExecutor.getActiveCount());
}
9. Чтобы сделать метод executeTask() синхронным, добавьте оператор while в метод и используйте метод future.isDone() для проверки состояния запроса. Как только это будет сделано, получите результат, используя future.get() .
while (future.isDone())
{
try {
future.get();
logger.info("task.employees: {}",taskThread.employees.toString());
}
catch (Exception e)
{
logger.error("Thread interrupted: ()",e.getCause());
}}
10. Теперь создайте 20000 или более задач в Службе сотрудников, чтобы протестировать многопоточность
for (int i=0;i<20000;i++)
{
TaskThread taskThread=new TaskThread(employeeRepository);
threadPoolExecutorUtil.executeTask(taskThread);
}
11. Перейдите в базу данных и вставьте некоторые данные о сотрудниках
INSERT INTO \`threadpooldemo\`.\`employee\` (\`id\`, \`email\`, \`first\_name\`, \`last\_name\`, \`phone\`) VALUES ('1', 'john.doe@hj.com', 'John', 'Doe', '233323');
INSERT INTO \`threadpooldemo\`.\`employee\` (\`id\`, \`email\`, \`first\_name\`, \`last\_name\`, \`phone\`) VALUES ('2', 'jack@hj.com', 'Jack', 'Doe', '09094044');
12. Перейдите в http://localhost:8080/api/v1/employee/list чтобы просмотреть список сотрудников, возвращаемый классом Employeerepository, без объединения потоков
12. Перейдите в http://localhost:8080/api/v1/employee/list/async в браузере и просмотрите журнал консоли IntelliJ и вы должны увидеть следующие сообщения в журнале.
....Number of Active Threads: 10 Queue Size: 9996 Number of Active Threads: 10 Queue Size: 9997 Number of Active Threads: 10 Queue Size: 9998 Number of Active Threads: 10 Queue Size: 9999 Number of Active Threads: 10 Queue Size: 10000 Number of Active Threads: 102019-06-19 01:18:09.983 ERROR 43110 --- \[nio-8080-exec-1\] c.p.s.service.ThreadPoolExecutorUtil : Exception occurred while adding task, Waiting for some timeQueue Size: 5066 Number of Active Threads: 10....
Мы добавили 20000 задач, с которыми может справиться больше, чем очередь, поэтому RejectedExecutionHandler вызывается и переводит поток в спящий режим, а другие потоки завершают операции к этому времени. Размер очереди уменьшился с 10000 до 5066 в приведенном выше журнале.
Код загружен Github для справки. Дайте мне знать, если у вас возникнут какие-либо вопросы. Счастливого кодирования:)
Оригинал: “https://dev.to/pavankjadda/synchronous-and-asynchronous-data-access-using-threadpoolexecutor-and-spring-boot-spring-data-1k3g”