自己实现一个线程池

小感触 2020年03月10日 275次浏览

了解线程池的具体原理之后,可以自己试着实现一个来加深自己的印象,不能眼高手低。能多看,也要多练。

package com.bestbigkk.code.thread;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * 自己实现一个线程池
 */
public class ThreadPool implements Executor {

    /**
     * 核心线程数量:当池中的线程小于该值的时候,如果有新任务进来,可以直接新建线程进行处理,提升效率
     * <p>
     * 当池中线程已经大于了coreSize之后,就不可以仍然新建线程了,
     * 需要先考虑使用另一种方式去处理。比如如果当前的任务过多,就先把这些积压的任务放到队列中去。然后有空闲的线程时,就取出来任务执行。
     * <p>
     * 如果此时任务非常非常多,仅仅只是靠着一个队列来一个个的执行,还是会有些力不从心,所以在队列任务很多的情况下,我们可以再次运行线程池
     * 新建线程,来加速队列中任务的处理速度。但是这个新建的过程也必须是可控的,不能因为队列任务的问题吗,就毫无限制的新建下去,因此需要使用一个
     * maxSize来约束最大新建的数量。
     * <p>
     * 如果采取了上面的做法,线程已经等于了maxSize但是还是无法缓解任务太多的问题,那么就需要采用拒绝策略,来避免太多太多的任务进来。
     */
    private final Integer coreSize;
    private final Integer maxSize;
    private final BlockingQueue<Runnable> taskQueue;
    private final RejectPolicy rejectPolicy;

    /**
     * 记录当前正在执行任务的线程数量
     */
    private AtomicInteger runningThreadCount = new AtomicInteger();
    private AtomicInteger threadIndex = new AtomicInteger();
    private static final String THREAD_NAME_PREFIX = "线程 - ";

    public ThreadPool(Integer coreSize, Integer maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        if (coreSize <= 0 || maxSize <= 0 || coreSize > maxSize) {
            throw new IllegalArgumentException();
        }
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }


    /**
     * 线程池执行流程:
     * 首先,如果运行的线程数小于核心线程数,直接创建一个新的核心线程来运行新的任务。
     * 其次,如果运行的线程数达到了核心线程数,则把新任务入队列。
     * 然后,如果队列也满了,则创建新的非核心线程来运行新的任务。
     * 最后,如果非核心线程数也达到最大了,那就执行拒绝策略。
     */
    @Override
    public void  execute(Runnable runnable) {
        final int running = runningThreadCount.get();
        // 正在运行线程小于核心线程,则直接创建新的线程进行运行。
        if (running < coreSize) {
            if (addThreadTask(runnable, true)) {
                return;
            }
        }
        //如果大于coreSize了,先尝试加入队列, offer()方法会直接返回添加的结果而不会阻塞。
        if (taskQueue.offer(runnable)) {
            return;
        }
        //如果加入队列失败,说明队列已满,此时尝试新建一个非核心线程
        final boolean addThreadResult = addThreadTask(runnable,true);
        if (addThreadResult) {
            return;
        }
        //添加非核心线程失败,此时直接执行拒绝策略。
        if (Objects.nonNull(rejectPolicy)) {
            rejectPolicy.reject(runnable);
        }
    }

    /**
     * 在线程池新建一个线程
     * @param runnable 任务
     * @param isCore  是否属于核心线程
     * @return 是否新建成功
     */
    private boolean addThreadTask(Runnable runnable, boolean isCore) {
        while (true) {
            //根据新建策略,获取对应的最大值。
            int max = isCore ? coreSize : maxSize;
            //获取当前正在运行的线程数量。
            int running = runningThreadCount.get();
            //当前正在运行的线程已经是最大的了,不满足条件,创建失败
            if (running >= max) {
                return false;
            }
            //通过上面判断,表示可以进行新线程的创建。

            /*
             * 首先尝试增加当前运行线程数量,如果增加成功,就可以新建一个线程,
             * 一定要先尝试增加runningThreadCount,因为这个过程在并发环境下,可能回失败,
             * 如果失败的话,就不再创建线程。
             */
            if (runningThreadCount.compareAndSet(running, running + 1)) {
                //新建线程
                new Thread(() -> {
                    //该线程建立之后,首先尝试执行创建时给定的任务,后续就循环尝试从队列取任务来执行。
                    Runnable task = runnable;
                    while (task != null || (task = getTaskFromQueue()) != null) {
                        //注意执行的时候一定要使用final保证无论当前任务执行如何,最终task都会置为null,否则将始终通过while的第一层判断,循环执行这个任务。
                        try {
                            System.out.println(Thread.currentThread().getName()+"执行任务");
                            task.run();
                        }finally {
                            task = null;
                        }
                    }
                }, THREAD_NAME_PREFIX + threadIndex.incrementAndGet()).start();
                return true;
            }
        }
    }

    private Runnable getTaskFromQueue() {
        try {
            //使用take方法从队列取任务,取不到的话会一直阻塞。
            return taskQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
            /*
            将null返回到外部,意味着外部线程的task是为null的,否针也不会执行到这个方法。
            而如果这个方法也返回null,那么就表示无法获取任务了。这个线程就会结束运行。
            因此需要控制运行中线程数量-1。
            同样的,在修改runningThreadCount时候,可能会失败,这里还是使用自旋。
            */
            while (true) {
                final int running = runningThreadCount.get();
                if (runningThreadCount.compareAndSet(running, running - 1)) {
                    return null;
                }
            }
        }
    }
}

/**
 * 拒绝策略。
 */
abstract class RejectPolicy{
    /**
     * 在线程池无法完成该任务的时候,采用的拒绝策略。
     * @param runnable
     */
    public abstract void  reject(Runnable runnable);
}

/**
 * 拒绝策略实现类:丢弃当前任务
 */
class DiscardRejectPolicy extends RejectPolicy {

    @Override
    public void reject(Runnable runnable) {
        System.out.println("丢弃任务");
    }
}

测试:

public class Test {
    @org.junit.Test
    public void testThreadPool() {
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(3);
        ThreadPool threadPool = new ThreadPool(5, 6, queue, new DiscardRejectPolicy());
        IntStream.rangeClosed(1, 10).forEach((v) -> threadPool.execute(()->System.out.println(v)));
    }
}