Java 线程池和队列
如果你还在继承Thread实现多线程?看看这个吧。
引子
传统上,我们都知道可以通过继承Thread类或者实现Runable接口来实现多线程的使用。
从Java5开始,Java提供了自己的线程池。线程池就是一个线程的容器,每次只执行额定数量的线程。 java.util.concurrent.ThreadPoolExecutor就是这样的线程池。
线程池
-
线程池的作用 根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。
-
为什么要用线程池: 1) 减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。 2) 可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。 比较重要的几个类:
ExecutorService | 真正的线程池接口 |
---|---|
ScheduledExecutorService | 能和Timer/TimerTask类似,解决那些需要任务重复执行的问题 |
ThreadPoolExecutor | ExecutorService的默认实现 |
ScheduledThreadPoolExecutor | 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现 |
常见的ThreadPoolExecutor使用流程
// 实现runnale接口,在run方法中实现业务
class test implements Runnable {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("test");
}
}
// 为某个业务单独创建一个线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1,
5,
3,
TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>()
);
// 在具体的业务逻辑处,调用execute方法,多线程执行业务
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(new test());
}
ThreadPoolExecutor构造方法说明
public ThreadPoolExecutor(
int corePoolSize, //线程池维护线程的最少数量
int maximumPoolSize, //线程池维护线程的最大数量
long keepAliveTime, //线程所允许的空闲时间(对超出corePoolSize的线程而言)
TimeUnit unit, //线程所允许的空闲时间的单位
BlockingQueue<Runnable> workQueue, //线程池所使用的缓冲队列
RejectedExecutionHandler handler //线程池对拒绝任务的处理策略
) {}
线程处理的优先级
- 在线程池中线程数小于corePoolSize时,每次执行都新建一个线程
- 在线程池中线程数等于corePoolSize,缓冲队列未满时,优先丢入队列
- 在线程池中线程数等于或大于corePoolSize,缓冲队列已满时,新建线程执行任务,直到线程数达到maximumPoolSize
- 在线程池中线程数等于maximumPoolSize,缓冲队列已满时,使用handler处理拒绝任务
- 在线程池中线程数大于corePoolSize时,空闲线程的存活时间由keepAliveTime和unit决定
BlockingQueue的种类
-
ArrayBlockingQueue 数组实现的阻塞队列,数组的大小就是队列的长度,如果队列为空且线程进行元素获取,或者队列已满且进行任务添加,都将导致阻塞等待。进出队列采用FIFO(先进先出)原则。
-
LinkedBlockingQueue 链表实现的阻塞队列,如果不在构造时指定大小,则其大小取决于Integer.MAX_VALUE,除了实现方式与ArrayBlockingQueue不同外,行为基本相同。
-
PriorityBlockingQueue 类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序。
-
SynchronousQueue 该队列的操作必须是放和取交替完成的。在被元素被取走之前,该元素的插入操作不会结束,因此,名为同步队列,也即非异步、阻塞执行。该队列长度为0,元素插入就需要被取走。
-
DelayQueue 是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期 时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。
-
LinkedTransferQueue 无界队列(Integer.MAX_VALUE),进出队列采用FIFO(先进先出)原则。生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费。主要用于线程间消息的传递,与SynchronousQueue很类似,但是比起SynchronousQueue更好用。LinkedTransferQueue既可以使用BlockingQueue的put方法进行常规的添加元素操作,也可以使用transfer方法进行阻塞添加,而且比SynchronousQueue灵活之处在于,队列长度非0,阻塞插入和非阻塞插入的元素可以共存。
-
各种Deque(双端队列) 双端队列不仅可以实现FIFO(先进先出)队列,还可以实现FILO(先进后出)的栈,但是不常用,在此不多做介绍。
RejectedExecutionHandler的种类
-
ThreadPoolExecutor.AbortPolicy() 抛出java.util.concurrent.RejectedExecutionException异常,注意,这是线程池的默认策略
-
ThreadPoolExecutor.CallerRunsPolicy() 重试添加当前的任务,他会自动重复调用execute()方法
-
ThreadPoolExecutor.DiscardOldestPolicy() 抛弃旧的任务(最先进入队列的任务)
-
ThreadPoolExecutor.DiscardPolicy() 抛弃当前的任务(即将进入队列的任务)
JDK提供的默认实现
ThreadPoolExecutor的构造方法不可谓不复杂,因此JDK也不推荐直接使用。java.util.concurrent.Executors提供了默认的实现,以此应对不同场景。ExcuteService实现了Executors并且是ThreadPoolExecutor的父接口。 通过以上介绍,查看源码即可很清晰的明白这四种实现的区别。
FixedThreadPool
线程数量固定的线程池,空闲线程销毁时间为0,无界队列,拒绝任务的策略为抛出异常
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
);
}
SingleThreadExecutor
最大线程数为1的线程池,空闲线程销毁时间为0,无界队列,拒绝任务的策略为抛出异常
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
));
}
CachedThreadPool
无界线程池,可以进行自动线程回收。之所以说可以自动回收是因为corePoolSize被设置为零,此外,这个线程池比较特殊的特点是采用了SynchronousQueue。结合上文,就会明白,所有元素都会单起一个线程阻塞的等待获取。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()
);
}
WorkStealingPool
在Executors中还会看到这种池,这个不是平常使用的线程池。而是一个ForkJoinPool。 ForkJoinPool是一个可以执行ForkJoinTask的ExcuteService,与ExcuteService不同的是它采用了work-stealing模式:所有在池中的线程尝试去执行其他线程创建的子任务(多线程并行执行一个任务),这样就很少有线程处于空闲状态,非常高效。 ForkJoinPool解决的不是并发问题,而是高效并行问题,在这里不做具体介绍。
SingleThreadScheduledExecutor
ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,用于多线程的执行定时任务,其内部使用的是与上文提到的DelayQueue相类似的队列,DelayedWorkQueue。此类线程池也不多做介绍。
总结
看到这里,想必对ThreadPoolExecutor的使用已经有了概念,接下来,只要根据需要使用响应的Executors即可。或者,现在就去写个demo吧 :)。