线程池

2022/4/18 6:14:40

本文主要是介绍线程池,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1、池化技术(思想)

(1)池化技术:程序运行的本质->占用系统的资源,优化资源的使用便是池化技术

(2)简单理解:实现准备好一些资源,有人要用,就来这个池拿,用完放回即可。线程池、数据库连接池、Http连接池等待

(3)好处:主要是为了减少每次获取资源的消耗,提高对资源的利用率。

2、线程池的好处

(1)降低资源消耗

   通过重复利用已创建的线程降低线程创建和销毁造成的消耗

(2)提高相应速度

   当任务到达时,任务可以不需要等待线程创建就能立即执行

(3)提高线程的可管理性

  线程是稀缺资源,无限制创建会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

 

3、Executor(一般创建线程池不建议使用)

(1)通过Executor来启动线程与Thread.start()方法相比:更好

==用线程池实现,节约开销,容易管理,效率更好

==有助于避免this逃逸问题

====this逃逸:指在构造函数返回之前其它线程就持有该对象的引用,调用未构造完全的对象的方法,可能引发错误。

(2)Executor框架,包括线程池的管理、线程工厂、队列、拒绝策略等,并发编程更加简单。

(3)结构:任务(Runnable\Callable)、任务的执行(Executor)、异步计算的结果(Future)(3大部分)

==【1】任务(Runnable\Callable)

====1)实现Runnable接口或Callable接口

====2)两个接口的实现类都可以被ThreadPoolExecutor(重要)或ScheduledThreadPoolExecutor执行。

==【2】任务的执行(Executor)

====1)Executor接口:任务执行机制的核心接口

====2)ExecutorService接口:继承自Executor接口

====3)ThreadPoolExecutor(重要)和ScheduledThreadPoolExecutor两个关键类实现了ExecutorService接口

//AbstractExecutorService实现了ExecutorService接口
public class ThreadPoolExecutor extends AbstractExecutorService

====4)ScheduledThreadPoolExecutor实际上是继承了ThreadPoolExecutor(重要)并实现了ScheduledExecutorSercice

//ScheduledExecutorService继承ExecutorService接口
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService

==【3】异步计算的结果(Future)

====1)Future接口以及Future接口的实现类FutureTask类都可以代表异步计算的结果

====2)把Runnable接口或Callable接口的实现类提交给ThreadPoolExecutor或ScheduledThreadPoolExecutor执行,调用submit()方法时会返回一个FutureTask对象

(4)Executor的使用

 

 

==[1]主线程创建实现Runnable接口或Callable接口的任务对象

==[2]任务对象直接交给ExecutorService接口执行

====ExecutorService.execute(Runnable command)

====ExecutorService.submit(Runnable task)

====ExecutorService.submit(Callable<T> task)

==[3]获取Future接口的对象——FutureTask(submit()和execute()的区别)

====如果执行submit(),ExecutorService会返回一个FutureTask对象(submit用于提交需要返回值的任务),通过FutureTask对象可以判断是否执行成功,同时,可以通过这个对象的get()方法来获取返回值

======FutureTask对象的get()方法,会阻塞当前线程直到任务完成,也可使用get(long time,TimeUnit timeunit)方法,阻塞一段时间后立即返回,但可能任务没有执行完,会抛出java.util.concurrent.TimeOutException。

====如果 执行execute(),ExecutorService不会有返回值(用于提交不需要返回值的任务),无法判断任务是否被线程池执行成功与否

====由于FutureTask实现了Runnable,可以创建FutureTask,直接交给ExecutorService执行。

==[4]主线程可以执行FutureTask.get()方法等待任务执行完成,也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)方法取消任务的执行。

 

4、ThreadPoolExecutor类(重要)

(1)有四个构造方法(解析最长的一个)

    /**
     * 用给定的初始参数创建一个新的ThreadPoolExecutor。
     */
    public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                              int maximumPoolSize,//线程池的最大线程数
                              long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                              TimeUnit unit,//时间单位
                              BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
                              ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                              RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                               ) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

(2)ThreadPoolExecutor七大参数(3个最重要)

 

 

==[1]corePoolSize:

==== 核心线程大小,核心线程数定义了最小可以同时运行的核心线程数量。

====线程池一直运行,核心线程就不会停止

==[2]maximumPoolSize:

====线程池最大线程数量

====非核心线程数量等于maximumPoolSize-corePoolSize

====当队列中存放的任务达到队列容量时,当前可以同时运行的线程数量变为最大线程数

==[3]workQueue:

====工作队列,包含阻塞队列,ArrayBlockingQueue,LinkedBlockingQueue等

====用来存放线程任务。当新任务来的时候,会先判断当前运行的线程数量是否达到核心线程数,如果达到,新任务就会被存放在队列中。

==[4]keepAliveTime:

====非核心线程的心跳时间,非核心线程在keepAliveTime时间内没有运行任务,且等待时间超过keepAliveTime,就会被回收销毁

====线程池中线程数量大于corePoolSize,超出的为非核心线程

==[5]unit:keepAliveTime参数的时间单位

==[6]threadFactory:

====线程工厂,新建线程的工厂

====创建新线程时会用到

==[7]handler:饱和(拒绝)策略(通过实现RejectedExecutionHandler接口)

====如果当前同时运行的线程数量达到最大线程数量且队列也已放满了任务,就要使用ThreadPoolExecutor定义的策略

====ThreadPoolExecutor.AbortPolicy(默认)

    线程任务拒绝,拒绝处理新任务,抛出RejectedExecutionException错误

====ThreadPoolExecutor.CallerRunsPolicy

(线程池外的线程直接调用run()方法执行)

    调用执行自己的线程运行任务,即直接在调用execute()方法的线程中,直接调用run()方法运行被拒绝的任务;

    如果执行的程序已经关闭,则丢弃该任务

    因此,这种策略会降低对于新任务的提交速度,影响程序的整体性能

====ThreadPoolExecutor.DiscardPolicy

   新任务直接丢弃,不报错,即不处理新任务

====ThreadPoolExecutor.DiscardOldestPolicy

   将workQueue队首的任务丢弃,将最新的任务重新加入队列等执行

   丢弃最早的未处理的任务(队列队首)的请求

 

5、实现线程池

(1)通过ThreadPoolExecutor构造函数实现(推荐)

(2)通过Executor框架的工具类Executors实现(不建议)

 

 

6、ThreadPoolExecutor的使用

(1)自定义(ExecutorService)

//自定义 线程池
public class dMyThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService myThreadPool = new ThreadPoolExecutor(
                //最大承载数 =  最大线程池大小 + 阻塞队列大小
                2,//核心线程池大小
                5,//最大线程池大小
                2,//超时等待时间
                TimeUnit.SECONDS,//超时等待时间单位
                new LinkedBlockingDeque<>(3),//阻塞队列为3  即有3个等待位置
                Executors.defaultThreadFactory(),//默认的线程创建工厂
//                new ThreadPoolExecutor.AbortPolicy()//第一种 拒绝策略  有超过最大承载数的线程时 不处理 会抛出异常  (银行满了,还有人进来不处理这个人 抛出异常)  java.util.concurrent.RejectedExecutionException
//                new ThreadPoolExecutor.CallerRunsPolicy()//第二种拒绝策略  超过承载数的 哪来的去哪里 即由哪里实现  这里是main线程来的 就由main线程实现
//                new ThreadPoolExecutor.DiscardPolicy() //第三种拒绝策略 超过承载数的 就丢掉 不抛出异常 也不实现
                new ThreadPoolExecutor.DiscardOldestPolicy()//第四种拒绝策略 超过承载数的 会尝试与最早的线程争取实现 争取成功就实现 失败就丢掉 不抛出异常
        );
​
        try {
            for (int i = 1; i <= 9; i++) {
                myThreadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" 666");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            myThreadPool.shutdown();
        }
​
    }
}

(2)CPU密集型写法(最大线程池=CPU核数)(ExecutorService)

//CPU密集型写法      最大线程池 定为 CPU的核数
public class eMyThreadPoolTestCPU {
    public static void main(String[] args) {
        //获取CPU的核数
        System.out.println(Runtime.getRuntime().availableProcessors());
        ExecutorService myThreadPool = new ThreadPoolExecutor(
                2,
                Runtime.getRuntime().availableProcessors(), //最大线程池为CPU的核数
                2,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy()
                );
​
        try {
            for (int i = 0; i < 8; i++) {
                myThreadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" 666");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            myThreadPool.shutdown();
        }
    }
}

(3)实现Runnable接口

import java.util.Date;
​
/**
 * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
 */
public class MyRunnable implements Runnable {
​
    private String command;
​
    public MyRunnable(String s) {
        this.command = s;
    }
​
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }
​
    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
​
    @Override
    public String toString() {
        return this.command;
    }
}
//使用ThreadPoolExecutor构造
​
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
​
public class ThreadPoolExecutorDemo {
​
    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;
    public static void main(String[] args) {
        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());
​
        for (int i = 0; i < 10; i++) {
            //创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            //执行Runnable
            executor.execute(worker);
        }
        //终止线程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}
//corePoolSize: 核心线程数为 5。
//maximumPoolSize :最大线程数 10
//keepAliveTime : 等待时间为 1L。
//unit: 等待时间的单位为 TimeUnit.SECONDS。
//workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
//handler:饱和策略为 CallerRunsPolicy。

 

7、线程池原理分析

 

 

(1)execute()\submit()方法:提交一个任务到线程池中

execute()底层:

addWorker():用来创建新的工作线程,返回true即创建和启动线程成功

(2)配置核心线程数量N、配置等待队列容量M、模拟N+M+S个任务

====每次只能存在N个任务同时执行,剩下的M个任务会被放到等待队列中,S个任务会放到线程池中

====当前N个任务有被执行完成的,队列中M个任务一些放入执行,S线程池会去拿新的任务

(3)1、线程池执行execute()|submit()方法向线程池中添加任务,当任务小于核心线程数corePoolSize,线程池中可以创建新的线程

2、当任务大于核心线程池corePoolSize,就向队列workQueue添加任务

3、当队列workQueue已经满了,需要比较maximumPoolSize,若小于maximumPoolSize,则在线程池创建新的线程,若大于maximumPoolSize,则说明当前设置线程池中线程已经处理不了了,就会执行饱和(拒绝)策略。

 

8、对比

(1)Runnable和Callable

==Runnable接口不会返回结果或抛出检查异常

@FunctionalInterface
public interface Runnable {
   /**
    * 被线程执行,没有返回值也无法抛出异常
    */
    public abstract void run();
}

==Callable接口会返回结果或抛出检查异常

@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算结果,或在无法这样做时抛出异常。
     * @return 计算得出的结果
     * @throws 如果无法计算结果,则抛出异常
     */
    V call() throws Exception;
}

(2)execute()和submit()

==execute():用于提交不需要返回值的任务,无法判断任务是否被线程池执行成功与否

==submit():用于提交需要返回值的任务,返回FutureTask对象

   FutureTask对象可以判断任务是否执行成功,并且可以通过get()方法获取返回值

   get()方法会阻塞当前线程直到任务完成

   get(Long timeout,TimeUnit unit)会阻塞,但在timeout时间内任务没有执行完会抛出java.util.concurrent.TimeOutException

(3)shutdown()和shutdownNow()

==shutdown():关闭线程池,线程池的状态变为SHUTDOWN,不再接受新的任务,但队列里的任务得执行完成

==shutdownNow():关闭线程池,线程池的状态变为STOP,线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的List.

(4)Executor和Executors

==Executor是接口对象,能执行线程任务,ExecutorService接口继承了Executor接口并进行了扩展,提供了获得任务执行状态和任务返回值的方法;使用ThreadPoolExecutor可以创建自定义线程池,Future表示异步计算结果,提供了检查计算是否完成的方法,以等待计算的完成,使用get()可以获取计算的结果

==Executors是工具类,可使用其中不同的方法来实现我们的需求来创建线程池

 

9、常见的线程池

(1)FixedThreadPool:可重用的指定线程数的线程池

        corePoolSize和maximumPoolSize是我们自己指定的

   /**
     * 创建一个可重用固定数量线程的线程池
     corePoolSize和maximumPoolSize是我们自己指定的
     */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

==执行过程:

====若当前运行的线程数小于corePoolSize,如果有新任务,就创建新的线程来执行任务

====若当前运行的线程等于corePoolSize,有新任务时,会将任务加入到LinkedBlockingQueue

====线程池中的线程执行完后,会循环从LinkedBlockingQueue中获取任务来执行

==不建议使用FixedThreadPool

====使用了无界的LinkedBlockingQueue(队列容量为Integer.Max_Value,可认为是无界的)

====线程池中线程数达到corePoolSize后,新任务会在无界队列中等待,因此线程池中的线程数不会超过corePoolSize

====不存在队列满的情况,可以设定corePoolSize和maximumPoolSize同值,maximumPoolSize无效

====无界队列,keepAliveTime将无效

====运行中FixedThreadPool不会拒绝任务,任务多的时候可能会造成内存溢出(OOM)

 

(2)SingleThreadExecutor:只有一个线程的线程池

         corePoolSize和maximumPoolSize被设置为1

   /**
     *返回只有一个线程的线程池
     corePoolSize和maximumPoolSize被设置为1
     */
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

==执行过程:

====只会用唯一的工作线程来执行任务,保证所有的任务按照指定的顺序(FIFO,LIFO,优先级)执行

====若当前运行线程数少于corePoolSize,则创建一个新的线程执行任务

====当前线程池中有一个运行的线程时,将任务加入LinkedBlockingQueue

====线程执行完当前的任务后,会从LinkedBlockingQueue中获取任务来执行

==特定:保证顺序地执行每个任务,且在任意给定时间内不会有多个线程是活动的

==不建议使用SingleThreadExecutor

====使用无界队列LinkedBlockingQueue作为工作队列

====SingleThreadExecutor使用无界队列,不会拒绝,会给线程池造成OOM(同FixedThreadPool)

 

(3)CachedThreadPool:可缓存的,会根据需要创建新线程的线程池

         corePoolSize为0 maximumPoolSize为Integer.Max_Value(无界)

    /**
     * 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。
     corePoolSize为0
     maximumPoolSize为Integer.Max_Value(无界)
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

==执行过程

====线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,就新建线程

====首先执行SynchronousQueue.offer(Runnable task)提交任务到任务队列

        若当前maximunPool中有空闲线程正在执行,SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),主线程执行offer操作与空闲线程执行poll()操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成否则执行以下步骤

====当初始maximumPool为空,或maximumPoo中没有空闲线程,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),此时,CachedThreadPool会创建新线程执行任务,execute方法执行完成。

==不建议使用

    CachedThreadPool允许创建的线程数量为Integer.Max_Value,当主线程提交任务的速度高于maximumPool中的线程处理任务的速度,可能会不断创建大量的线程,造成OOM

(4)ScheduledThreadPoolExecutor:定长的,支持定时的以及周期性任务执行

         主要用来在给定的延迟后运行任务,或者定期执行任务

==使用任务队列:DelayWorkQueue

====封装了PriorityQueue,会对队列中的任务进行排序,执行所需时间短的放在前面先执行(ScheduledFutureTask的time变量小的先执行),执行所需时间相同则先提交的任务先执行(ScheduledFutureTask的squenceNumber变量小的先执行)

====Timer对系统时钟变化敏感,只有一个执行线程,因此,长时间运行的任务可以延迟其它任务

====ScheduledThreadPoolExecutor可以配置任意数量的线程,可以完全控制创建的线程

====TimerTask中抛出的运行时异常会kill一个线程,导致Timer死机

        即当前抛出异常的任务不再允许,允许在需要处理(通过重写afterExecute()),其它任务将继续运行。

==执行过程:

====当调用SchedulledThreadPoolExecutor.scheduleAtFixedRate()方法或scheduleWithFixedDelay()方法,会向DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask。

====线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务

 

10、线程池大小的确定(如何合理配置线程池的参数)

上下文切换: 当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换

(1)CPU密集型

==该任务需要最大的运算,而没有阻塞,CPU一直全速运行。

==任务消耗主要是CPU资源

==可将线程数设置为N(CPU核数)+1

== +1的目的:防止线程可能发生的缺页中断或导致任务暂停带来的影响

(2)IO密集型

==该任务需要大量的IO,即大量的阻塞,系统大部分时间在处理IO交互

==处理IO时间段内不会占用太多的CPU,可以将CPU交给其它线程使用

==可将线程数设置为2N



这篇关于线程池的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程