线程池实现例子
2022/3/4 23:45:01
本文主要是介绍线程池实现例子,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
ThreadPool接口
public interface ThreadPool { //提交任务到线程池 void execute(Runnable runnable); //关闭线程池 void shutdown(); //获取线程池的初始化大小 int getInitSize(); //获取线程池的核心线程数量 int getCoreSize(); //获取线程池的最大线程数量 int getMaxSize(); //获取线程池中用于缓存任务队列的大小 int getQueueSize(); //获取线程池中活跃的线程的数量 int getActiveCount(); //查看线程池是否已经被shutdown boolean isShutdown(); }
ThreadFactory接口
/** * 创建个性化线程 * * ThreadFactory提供创建线程的接口,以便个性化定制Thread,比如Thread应该被加入到哪个 * Thread Group中,优先级、线程名称,以及是否为守护线程等 **/ @FunctionalInterface public interface ThreadFactory { Thread createThread(Runnable runnable); }
RunnableQueue接口
/** * 线程队列基本操作 * * RunnableQueue主要用于存放提交的Runnable * 该Runnable是一个BlockedQueue,并且有limit限制 **/ public interface RunnableQueue { //当有新的任务进来时首先会offer到队列中 void offer(Runnable runnable); //工作线程通过take方法获取Runnable Runnable take() throws InterruptedException; //获取任务队列中任务的数量 int size(); }
DenyPolicy接口
/** * 线程池满时拒绝策略 **/ @FunctionalInterface public interface DenyPolicy { void reject(Runnable runnable,ThreadPool threadPool); //该拒绝策略会直接将任务丢弃 class DiscardDenyPolicy implements DenyPolicy { @Override public void reject(Runnable runnable,ThreadPool threadPool) { //do nothing } } //该拒绝策略向任务提交者抛出异常 class AbortDenyPolicy implements DenyPolicy { @Override public void reject(Runnable runnable,ThreadPool threadPool) { throw new RuntimeException("The runnable "+runnable+" will be abort."); } } //该拒绝策略会使任务在提交者所在的线程中执行任务 class RunnerDenyPolicy implements DenyPolicy { @Override public void reject(Runnable runnable,ThreadPool threadPool) { if(!threadPool.isShutdown()) { runnable.run(); } } } }
InternalTask
/** * 不断从runnableQueue中取出Runnable并执行任务 **/ public class InternalTask implements Runnable{ private final RunnableQueue runnableQueue; private volatile boolean running=true; public InternalTask(RunnableQueue runnableQueue){ this.runnableQueue=runnableQueue; } @Override public void run() { //如果当前任务为running且没有被中断,则将其不断地从queue中获取runnable,然后执行run while(running && !Thread.currentThread().isInterrupted()) { try { Runnable task=runnableQueue.take(); task.run(); }catch (InterruptedException e){ running=false; break; } } } //停止当前任务,主要会在线程池的shutdown方法中使用 public void stop() { this.running=false; } }
LinkedRunnableQueue
/** * 双向循环链表实现线程任务队列基本操作 **/ public class LinkedRunnableQueue implements RunnableQueue{ //任务队列的最大容量,在构造时传入 private final int limit; //若任务队列中的任务已经满了,则需要执行拒绝策略 private final DenyPolicy denyPolicy; //存放任务的队列 private final LinkedList<Runnable> runnableList = new LinkedList<>(); private final ThreadPool threadPool; public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) { this.limit = limit; this.denyPolicy = denyPolicy; this.threadPool = threadPool; } @Override public void offer(Runnable runnable) { synchronized (runnableList){ if (runnableList.size()>=limit){ //无法容纳新的任务时执行拒绝策略 denyPolicy.reject(runnable,threadPool); }else { //将任务加入到队尾,并且唤醒阻塞中的线程 runnableList.addLast(runnable); runnableList.notifyAll(); } } } @Override public Runnable take() throws InterruptedException { synchronized (runnableList){ while (runnableList.isEmpty()){ try { //如果任务队列没有可执行任务,则当前线程会挂起, //进入runnableList关联的monitor set中等待唤醒 runnableList.wait(); }catch (InterruptedException e){ //被中断时将异常抛出 throw e; } } return runnableList.removeFirst(); } } @Override public int size() { synchronized (runnableList){ //返回当前任务队列的任务数 return runnableList.size(); } } }
RunnableDenyException
/** * 错误抛出 * * RunnableDenyException是RuntimeException的子类,主要通知人物提交者,任务队列 * 无法再接收新的任务 **/ public class RunnableDenyException extends RuntimeException{ public RunnableDenyException(String message) { super(message); } }
BasicThreadPool
/** * 实现ThreadPool * * 线程池的初始化:数量控制属性、创建线程工厂、任务队列策略等功能 **/ public class BasicThreadPool extends Thread implements ThreadPool{ //初始化线程数量 private final int initSize; //线程池最大线程数量 private final int maxSize; //线程池核心线程数量 private final int coreSize; //当前活跃的线程数量 private int activeCount; //创建线程所需的工厂 private final ThreadFactory threadFactory; //任务队列 private final RunnableQueue runnableQueue; //线程池是否已经被shutdown private volatile boolean isShutdown = false; //工作线程队列 private final Queue<ThreadTask> threadQueue = new ArrayDeque<>(); private static final DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy(); private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory(); private final long keepAliveTime; private final TimeUnit timeUnit; //构造线程时传参 public BasicThreadPool(int initSize,int maxSize,int coreSize,int queueSize){ this(initSize,maxSize,coreSize,DEFAULT_THREAD_FACTORY,queueSize, DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS); } public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize,DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) { this.initSize = initSize; this.maxSize = maxSize; this.coreSize = coreSize; this.threadFactory = threadFactory; this.runnableQueue = new LinkedRunnableQueue(queueSize,denyPolicy,this); this.keepAliveTime = keepAliveTime; this.timeUnit = timeUnit; this.init(); } //初始化时,先创建initSize个线程 private void init(){ start(); for (int i = 0; i < initSize; i++){ newThread(); } } @Override public void execute(Runnable runnable) { if (this.isShutdown){ throw new IllegalStateException("The thread pool is destroy"); } //提交任务只是简单第往任务队列中插入Runnable this.runnableQueue.offer(runnable); } private void newThread(){ //创建任务线程并且启动 InternalTask internalTask=new InternalTask(runnableQueue); Thread thread=this.threadFactory.createThread(internalTask); ThreadTask threadTask=new ThreadTask(thread,internalTask); threadQueue.offer(threadTask); this.activeCount++; thread.start(); } private void removeThread(){ //从线程池移除某个线程 ThreadTask threadTask=threadQueue.remove(); threadTask.internalTask.stop(); this.activeCount--; } @Override public void run() { //run方法继承自Thread,主要用于维护线程数量,比如扩容,回收 while (!isShutdown && !isInterrupted()){ try { timeUnit.sleep(keepAliveTime); }catch (InterruptedException e){ isShutdown=true; break; } synchronized (this){ if (isShutdown){ break; } //当前队列中有任务尚未处理,并且activeCount<coreSize则继续扩容 if (runnableQueue.size()>0&&activeCount<coreSize){ for (int i=initSize;i<coreSize;i++){ newThread(); } //continue的目的在于不想让线程的扩容直接达到maxSize continue; } //当前队列中有任务尚未处理,并且activeCount<maxSize则继续扩容 if (runnableQueue.size()>0&&activeCount<maxSize){ for (int i=coreSize;i<maxSize;i++){ newThread(); } } //如果任务队列中没有任务,则需要回收,回收至coreSize即可 if (runnableQueue.size()==0&&activeCount>coreSize){ for (int i=coreSize;i<activeCount;i++){ removeThread(); } } } } } @Override public void shutdown() { synchronized (this){ if (isShutdown)return; isShutdown=true; threadQueue.forEach(threadTask -> { threadTask.internalTask.stop(); threadTask.thread.interrupt(); }); this.interrupt(); } } @Override public int getInitSize() { if (isShutdown) throw new IllegalStateException("The thread pool is destroy"); return this.initSize; } @Override public int getCoreSize() { if (isShutdown) throw new IllegalStateException("The thread pool is destroy"); return this.coreSize; } @Override public int getQueueSize() { if (isShutdown) throw new IllegalStateException("The thread pool is destroy"); return runnableQueue.size(); } @Override public int getMaxSize() { if (isShutdown) throw new IllegalStateException("The thread pool is destroy"); return this.maxSize; } @Override public int getActiveCount() { if (isShutdown) throw new IllegalStateException("The thread pool is destroy"); return this.activeCount; } @Override public boolean isShutdown() { return this.isShutdown; } private static class ThreadTask{ Thread thread; InternalTask internalTask; public ThreadTask(Thread thread, InternalTask internalTask) { this.thread = thread; this.internalTask = internalTask; } } private static class DefaultThreadFactory implements ThreadFactory{ private static final AtomicInteger group_counter=new AtomicInteger(1); private static final ThreadGroup group = new ThreadGroup("myGroup-"+group_counter.getAndDecrement()); public static final AtomicInteger COUNTER =new AtomicInteger(0); @Override public Thread createThread(Runnable runnable) { return new Thread(group,runnable,"thread-poll-"+COUNTER.getAndDecrement()); } } }
测试线程池
/** * 一个简单的程序分别测试线程池的任务提交、线程池线程数量的动态扩展,以及线程池的销毁功能 */ public class ThreadPoolTest { public static void main(String[] args) throws InterruptedException { //定义线程池,初始化程数为2,核心或程数为4,最大程数为6.任务队列最多允许1000个任务 final ThreadPool threadPool=new BasicThreadPool(2,6,4,1000); //定义20个任务并且提交蛤线程池 for (int i=0;i<20;i++){ threadPool.execute(()->{ try { TimeUnit.SECONDS.sleep(10); System.out.println(Thread.currentThread().getName()+" is" + " running and done."); }catch (InterruptedException e){ e.printStackTrace(); } }); } for (; ; ){ //不断输出线程池的信息 System.out.println("getActiveCount = "+threadPool.getActiveCount()); System.out.println("getQueueSize = "+threadPool.getQueueSize()); System.out.println("getCoreSize = "+threadPool.getCoreSize()); System.out.println("getMaxSize = "+threadPool.getMaxSize()); System.out.println("=========================================="); TimeUnit.SECONDS.sleep(5); } } }
这篇关于线程池实现例子的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-31全网首发第二弹!软考2024年5月《软件设计师》真题+解析+答案!(11-20题)
- 2024-05-31全网首发!软考2024年5月《软件设计师》真题+解析+答案!(21-30题)
- 2024-05-30【Java】百万数据excel导出功能如何实现
- 2024-05-30我们小公司,哪像华为一样,用得上IPD(集成产品开发)?
- 2024-05-30java excel上传--poi
- 2024-05-30安装笔记本应用商店的pycharm,再安排pandas等模块,说是没有打包工具?
- 2024-05-29java11新特性
- 2024-05-29哪些无用敏捷指标正在破坏敏捷转型?
- 2024-05-29鸿蒙原生应用再新丁!新华社 入局鸿蒙
- 2024-05-29设计模式 之 迭代器模式(Iterator)