20211018-ThreadPoolExecutor

2022/5/22 23:05:38

本文主要是介绍20211018-ThreadPoolExecutor,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

成员变量

ctl变量

/**
    * The main pool control state, ctl, is an atomic integer packing
     * two conceptual fields
     *   workerCount, indicating the effective number of threads
     *   runState,    indicating whether running, shutting down etc
     
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;// 32-3=29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 2的29次方-1,0001... 低29位表示线程数最大数,高3位表示executors状态
​
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
​
// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }// 运行状态,即上面的 RUNNING等
private static int workerCountOf(int c)  { return c & CAPACITY; } // worker即工人数量
private static int ctlOf(int rs, int wc) { return rs | wc; }// runState 与 workerCount的和

mainLock+works

private final ReentrantLock mainLock = new ReentrantLock();// 访问works的锁
private final HashSet<Worker> workers = new HashSet<Worker>();

 

 

ThreadPoolExecutor

execute

1、worker<coreSize,新增worker

2、worker>=coreSize,queue未满,加入任务队列

3、worker>=coreSize,queue满了,但是worker<maxSize,新增worker

4、worker>=maxSize,queue满了,拒绝策略拒绝

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

 

addWorker

1、第一个for

自旋+CAS:增加 ctl内 worker数量

2、第二个for

new Worker,再加入 works

worker.start

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
​
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
​
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))// 数量限制与workers数量比较,决定能否新增worker
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
​
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
​
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

 

Worker

public void run() {
    runWorker(this);
}

 

runWorker

1、取任务,来自 firstTask 或者 getTask()

2、有任务,task.run(),进入下一个while

3、无任务,processWorkerExit

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;// 突然中断,如果while条件未满足则非突然的,其他都是突然的
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

 

getTask

1、是否淘汰(核心线程运行超时 或 worker数量大于corePoolSize)

2、是淘汰-超时时间内获取任务

3、不淘汰-不限时阻塞获取任务

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
​
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
​
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
​
        int wc = workerCountOf(c);
​
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 是否淘汰
​
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
​
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 需要淘汰,超时时间内获取任务,此处使用空闲时间
            workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

 

 

processWorkerExit

1、完成任务计数 更新

2、移除当前worker

3、非正常完成,新增worker

4、正常完成,worker数量满足最小要求,直接退出;不满足min要求,新增worker

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();// 是突然的,ctl的work计数未调整,此处调整
​
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;// 完成任务计数 更新
        workers.remove(w);// 移除当前worker
    } finally {
        mainLock.unlock();
    }
​
    tryTerminate();// 不太清楚有什么用
​
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {// 正常完成
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 允许核心线程空闲超时时死亡,则线程池最小线程数为0;否则最小线程数是corePoolSize
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

 

参数配置

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

1、需要配置哪些

int corePoolSize		核心线程数
int maximumPoolSize		最大线程数
queueSize				队列长度

 

2、如何配置

依照

每秒请求数(QPS,如 100~1000)+

每个请求耗时(COST,0.5s)+

系统最大响应时间(MAXRSP,2s)

corePoolSize = QPS/(1/COST) = QPS/2 = 50~500

其中 (1/COST) 可以理解为单个线程 1s内可以完成的请求数 n(0<n<无限大),此处为 2,即 1s内一个线程能完成 2个请求

哦,网上还说了个什么 8020原则,貌似希望核心线程数满足 80% 的最大请求数,那么此处应该就是 400

 

queueSize = (MAXRSP-COST) * (max(QPS)-corePoolSize*(1/COST)) = 1.5 * (1000-800) = 300

太大:接入了无法满足最大响应时间的请求

太小:能满足最大响应时间的请求又拒绝了

 

队列大小应该满足最大响应时间,目前看是队列满时,最后一个任务出队完成刚好满足最大响应时间

最大响应时间 2s - 请求耗时 0.5s = 最长待 1.5秒,即 1.5s内核心线程数可以堆积的任务数

 

maximumPoolSize = max(QPS)/(1/COST) = 500

太大,创建过多线程,OOM;应该大于corePoolSize=400 但是小于最大 QPS 所需线程数=500

原则上最大线程数与队列都满负荷运作,应该满足最大请求数,此处QPS=1000

3、问

进入队列的请求与下一秒新的请求,谁会先执行

队列内的请求由以往work完成

新的请求看情况是

1、入队-新请求后执行

2、新增worker-新请求应该会先执行

3、拒绝



这篇关于20211018-ThreadPoolExecutor的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程