Java线程池实现多任务并发执行
1️⃣ 创建一些任务来落地多任务并发执行
每一个数组里面的数据可以看成任务,或者是需要并发的业务接口,
数组与数组之间,可以看作为他们之间有血缘关系,简单来说就是:
taskJksj里面的10个任务执行完之后,才可以执行taskJxdx里面的4个任务,执行完taskJxdx之后,才可以执行taskNbzz里面的2个任务
2️⃣ 创建线程池
要将taskJksj、taskJxdx、taskNbzz这几个数组中里面定义的任务通过线程池并发执行
3️⃣ ThreadPoolExecutor源码分析以及为什么不用newFixedThreadPool()和newCachedThreadPool()
1.首先为什么不用newFixedThreadPool()和newCachedThreadPool()
点进去查看这两个方法的源码
2.为什么用ThreadPoolExecutor创建
通过上面两个例子就能看到,这俩方法很不靠谱,如果你不明白他的原理,看到项目上以前创建线程的代码就是这样的,你想都不想就copy过来,那后面绝对就是在给自己挖坑;
通过发现这俩方法,他们都是return new ThreadPoolExecutor(),所以真正的大佬其实是ThreadPoolExecutor,他俩只是调用了ThreadPoolExecutor而已。
核心线程数:初始定义的线程数量,是绝对会开启的固定的线程数量
最大线程数:当前线程池支持的最大线程数量,如果超过了这个数量那么肯定就报错了
阻塞队列 :当前进来线程池的线程大于核心线程数且小于最大线程数,那么就把当前线程池的线程-核心线程数的线程放在阻塞队列里,让他等着假如核心线程数 2 个,最大线程数5个,阻塞队列长度 3,当前进来了4个线程,那么就将 4 - 2 = 2 个线程放在阻塞队列里面,让他先等待
默认工厂 :当前进来线程池的线程大于最大线程数且小于(最大线程数+阻塞队列长度),那么就需要开放剩下的三个线程通道,让另外的3个线程通道进行工作,核心线程数 2 个,最大线程数5个, 阻塞队列长度3,当前进来了8个,可以看到进来了8个线程,已经满足了最大线程和阻塞队列长度之和了,简单理解就是现在进来的线程把目前这个线程池所有能利用的空间都占满了,只有 2个线程工作不够,需要把另外的3个(5 - 2)赶快放开让他们也工作,这个打开另外三个线程的这个工作就需要工厂来做,让工厂把这三个线程打开
拒绝策略 :当进入线程池的线程过多,远远超过了最大线程数+阻塞队列,那么就需要拒绝这些即将要进入线程池的线程。
等待时间:在等待时间段中,当线程池里面的线程都执行的差不多了,又回到了"进来线程池的线程大于核心线程数且小于最大线程数"时,就没有必要把5个线程通道全部打开,浪费资源,所以就把其 他的三个线程关掉,留2个核心的就行
等待时间单位:单位,时分秒
4️⃣ 执行任务
为三个任务编写对应的执行多线程方法,写法都是一样的,重复copy即可,最后执行的效果就是
import java.util.concurrent.*;/**
* @Author : YuanXin
* @create 2024/2/1 11:11
* @Description :*/publicclass Main {publicstaticvoid main(String[] args) {
taskListImpl taskList=new taskListImpl();
String taskJksj= taskList.poolExecutorJksj();
String taskJxdx=null;if (taskJksj.equals("taskJksjSuccess")) {
taskJxdx= taskList.poolExecutorJxdx();
}if (taskJxdx.equals("taskJxdxSuccess")) {
taskList.poolExecutorNbzz();
}
}
}class taskListImpl {// 创建一些任务int[] taskJksj =newint[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0};int[] taskJxdx =newint[]{15, 16, 17, 18};int[] taskNbzz =newint[]{101, 102};public String poolExecutorJksj() {
ThreadPoolExecutor pool=new ThreadPoolExecutor(3,10,3,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(5),
Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()
);// ExecutorService cachedThreadPool = Executors.newCachedThreadPool();// ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);try {for (int i = 0; i < taskJksj.length; i++) {int num = i;
pool.execute(()-> {
taskJksjPool(num);
});
}
}catch (Exception e) {thrownew RuntimeException(e);
}finally {
pool.shutdown();
}return "taskJksjSuccess";
}publicvoid taskJksjPool(int num) {
System.out.println(Thread.currentThread().getName()+ " " + taskJksj[num]);
}public String poolExecutorJxdx() {
ThreadPoolExecutor pool=new ThreadPoolExecutor(3, 10, 3, TimeUnit.SECONDS,new LinkedBlockingDeque<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());try {for (int i = 0; i < taskJxdx.length; i++) {int num = i;
pool.execute(()-> {
taskJxdxPool(num);
});
}
}catch (Exception e) {thrownew RuntimeException(e);
}finally {
pool.shutdown();
}return "taskJxdxSuccess";
}publicvoid taskJxdxPool(int num) {
System.out.println(Thread.currentThread().getName()+ " " + taskJxdx[num]);
}public String poolExecutorNbzz() {
ThreadPoolExecutor pool=new ThreadPoolExecutor(3, 10, 3, TimeUnit.SECONDS,new LinkedBlockingDeque<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());try {for (int i = 0; i < taskNbzz.length; i++) {int num = i;
pool.execute(()-> {
taskNbzzPool(num);
});
}
}catch (Exception e) {thrownew RuntimeException(e);
}finally {
pool.shutdown();
}return "taskNbzzSuccess";
}publicvoid taskNbzzPool(int num) {
System.out.println(Thread.currentThread().getName()+ " " + taskNbzz[num]);
}
}