首页 未命名正文

linux编程_ThreadPoolExecutor-线程池开发的使用

云返利网 未命名 2020-05-26 09:07:24 10 0

很久没有写过条记了,最近做的一个项目涉及打线程池和行列的开发,以为在这个项目中学习到的照样挺多的,对线程平安,并发的知识有加深认知;固然,现在用过的器械并不是代表以后还能娴熟的使用,做好条记非常主要;

1:必须明了为什么要使用线程池:(这点很主要)

  a:手上项目所需,由于项目主要的目的是实现多线程的数据推送;需要建立多线程的话,那就要处置好线程平安的问题;由于项目需要,还涉及到排队下载的功效,以是就选择了线程池来治理线程以及线程池内里的义务行列workQueue来实现项目所需的功效;

  b:在现实使用中,服务器在建立和销毁线程上破费的时间和消耗的系统资源都相当大,甚至可能要比在处置现实的用户请求的时间和资源要多的多。除了建立和销毁线程的开销之外,流动的线程也需要消耗系统资源。若是在一个jvm里建立太多的线程,可能会使系统由于过分消耗内存或“切换过分”而导致系统资源不足。为了防止资源不足,服务器应用程序需要接纳一些设施来限制任何给定时刻处置的请求数目,尽可能削减建立和销毁线程的次数,特别是一些资源花费对照大的线程的建立和销毁,只管行使已有工具来举行服务,这就是“池化资源”手艺发生的缘故原由。 线程池主要用来解决线程生命周期开销问题和资源不足问题(这段是摘自网络)

2:若何建立一个线程池:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

这里只是建立线程池其中的一个组织函数;实在其他的组织函数最终照样挪用的这个组织函数;

说明一下这些参数的作用:

corePoolSize:焦点池的巨细,在建立了线程池后,线程池中的线程数为0,当有义务来之后,就会建立一个线程去执行义务,当线程池中的线程数目到达corePoolSize后,就会把到达的义务放到缓存行列当中;

maximumPoolSize:线程池最大线程数,它示意在线程池中最多能建立多少个线程;这个参数是跟后面的壅闭行列联系慎密的;只有当壅闭行列满了,若是另有义务添加到线程池的话,会实验new 一个Thread的举行救急处置,立马执行对应的runnable义务;若是继续添加义务到线程池,且线程池中的线程数已经到达了maximumPoolSize,那么线程就会就会执行reject操作(这里后面会提及到)

keepAliveTime:示意线程没有义务执行时最多保持多久时间会终止;默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用;即当线程池中的线程数大于corePoolSize时,若是一个线程空闲的时间到达keepAliveTime,则会终止,直到线程池中的线程数不跨越corePoolSize。然则若是挪用了allowCoreThreadTimeOut(boolean)方式并设置了参数为true,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的壅闭行列巨细为0;(这部门通过查看ThreadPoolExecutor的源码剖析--getTask()部门);

unit:参数keepAliveTime的时间单元,有7种取值,在TimeUnit类中有7种静态属性(时间单元)

workQueue:一个壅闭行列,用来存储守候执行的义务,这个参数的选择也很主要,会对线程池的运行历程发生重大影响,一样平常来说,这里的壅闭行列有以下几种选择  

  ArrayBlockingQueue;

  LinkedBlockingQueue;

  SynchronousQueue;

  ArrayBlockingQueue和PriorityBlockingQueue使用较少,一样平常使用LinkedBlockingQueue和Synchronous。线程池的排队计谋与BlockingQueue有关。

threadFactory:线程工厂,主要用来建立线程:默认值 DefaultThreadFactory;

handler:示意当拒绝处置义务时的计谋,就是上面提及的reject操作;有以下四种取值:

  ThreadPoolExecutor.AbortPolicy:甩掉义务并抛出RejectedExecutionException异常。(默认handle)

  ThreadPoolExecutor.DiscardPolicy:也是甩掉义务,然则不抛出异常。

  ThreadPoolExecutor.DiscardOldestPolicy:甩掉行列最前面的义务,然后重新实验执行义务(重复此历程)

  ThreadPoolExecutor.CallerRunsPolicy:由挪用线程处置该义务

3:对线程池的基本使用及其部门源码的剖析(注重:这里的源码剖析是基于jdk1.6;)

a:线程池的状态

volatile int runState;
 static final int RUNNING = 0; 运行状态
static final int SHUTDOWN = 1; 关闭状态;SHUTDOWN状态,此时线程池不能够接受新的义务,它会守候所有义务执行完毕
static final int STOP = 2;住手状态;此时线程池不能接受新的义务,而且会去实验终止正在执行的义务
static final int TERMINATED = 3;终止状态;当线程池处于SHUTDOWN或STOP状态,而且所有事情线程已经销毁,义务缓存行列已经清空或执行竣事后,线程池被设置为TERMINATED状态

b:参数再次说明。这是摘自网络的注释,我以为他比喻的很好,以是这里直接就用它的注释

  这里要重点注释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。

  corePoolSize在许多地方被翻译成焦点池巨细,实在我的明白这个就是线程池的巨细。举个简朴的例子:

  假如有一个工厂,工厂内里有10个工人,每个工人同时只能做一件义务。

  因此只要当10个工人中有工人是空闲的,来了义务就分配给空闲的工人做;

  当10个工人都有义务在做时,若是还来了义务,就把义务举行排队守候;

  若是说新义务数目增进的速率远远大于工人做义务的速率,那么此时工厂主管可能会想补救措施,好比重新招4个暂且工人进来;

  然后就将义务也分配给这4个暂且工人做;

  若是说着14个工人做义务的速率照样不够,此时工厂主管可能就要思量不再吸收新的义务或者甩掉前面的一些义务了。

  当这14个工人当中有人空闲时,而新义务增进的速率又对照缓慢,工厂主管可能就思量辞掉4个暂且工了,只保持原来的10个工人,究竟请分外的工人是要花钱的。

  这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

  也就是说corePoolSize就是线程池巨细,maximumPoolSize在我看来是线程池的一种补救措施,即义务量突然过大时的一种补救措施。

  不外为了利便明白,在本文后面照样将corePoolSize翻译成焦点池巨细。

  largestPoolSize只是一个用来起纪录作用的变量,用来纪录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。

c:添加线程池义务的入口就是execute();

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();//义务为空时抛出异常
    //若是线程池线程巨细小于焦点线程,就新建一个线程加入义务并启动线程
    //若是线程池线程巨细大于焦点线且且添加义务到线程失败,就把义务添加到壅闭行列
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//新建线程并启动
        if (runState == RUNNING && workQueue.offer(command)) {//添加义务到行列
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);//添加到行列失败或已满,做拒接义务处置计谋
        }
        //若壅闭行列失败或已满;这里新建一个线程并启动做应急处置(这里就是用到了maximumPoolSize参数)
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // 若线程池的线程跨越了maximumPoolSize;就做拒绝处置义务计谋
    }
}

-->>继续跟踪代码到addIfUnderCorePoolSize(Runnable firstTask):函数名称就可以看出来这个函数要执行的什么;若是线程池的线程小于焦点线程数corePoolSize就新建线程加入义务并启动线程【在往后的开发中只管把需要做的功效在函数名体现出来】

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;//获取当前线程池的锁
        mainLock.lock();//加锁
        try {
            /*
            这里线程池线程巨细还需要判断一次;前面的判断历程中并没有加锁,因此可能在execute方式判断的时刻poolSize小于corePoolSize,而判断完之后,在其他线程中又向线程池提交了义务,就可能导致poolSize不小于corePoolSize了,以是需要在这个地方继续判断
            */
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);//新建线程
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();//若建立线程跨越,就启动线程池的线程
        return true;
    }
    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);//worker:ThreadPoolExecutor的内部类;
        Thread t = threadFactory.newThread(w);//使用线程工厂建立一个线程
        if (t != null) {
            w.thread = t;
            workers.add(w);//保留线程池正在运行的线程
            int nt = ++poolSize;//线程池的线程数加1
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }

-->>接下来定位worker类,看看线程池里的线程是若何执行的

上面的addIfUnderCorePoolSize(..)已经把线程启动了;现在就直接查看worker 的run()方式了

public void run() {
    try {
        Runnable task = firstTask;//该线程的第一个义务,执行完后就从壅闭行列取义务执行
        firstTask = null;
        while (task != null || (task = getTask()) != null) {//getTask()从行列去义务执行
            runTask(task);//线程执行义务
            task = null;
        }
    } finally {
        workerDone(this);//若义务所有执行完,就最先实验去住手线程池;这部门代码就不再追踪下去,有兴趣的读者可以自己打开源码剖析,不必畏惧,学习大神们的编码方式,看源码能让你学习到许多
    }
}
 private void runTask(Runnable task) {
    final ReentrantLock runLock = this.runLock;
    runLock.lock();
    try {
        //多次检查线程池有没有关闭
        if (runState < STOP &&
            Thread.interrupted() &&
            runState >= STOP)
            thread.interrupt();
           
        boolean ran = false;
        //这里就可以继续ThreadPoolExecutor,并笼罩beforeExecute(...)该方式,来做一些执行义务之前的统计事情或者用来保留正在执行的义务
        beforeExecute(thread, task);
        try {
            task.run();
            ran = true;
            //这里就可以继续ThreadPoolExecutor,并笼罩beforeExecute(...)该方式,来做一些执行义务完成之后的统计事情或者用来保留正在执行的义务
            afterExecute(task, null);
            ++completedTasks;//统计总共执行的义务数
        } catch (RuntimeException ex) {
            if (!ran)
                afterExecute(task, ex);
            throw ex;
        }
    } finally {
        runLock.unlock();
    }
}

至此线程池基本的流程完了;

再说说我在项目中的使用:
MyExtendThreadPoolExecutor 继续了 ThreadPoolExecutor,并笼罩了其中的一些方式

public class MyExtendThreadPoolExecutor extends ThreadPoolExecutor{
    public static Logger logger=LoggerFactory.getLogger(MyExtendThreadPoolExecutor.class);
    /**
    * 纪录运行中义务
    */
    private LinkedBlockingQueue<Runnable> workBlockingQueue=new  LinkedBlockingQueue<Runnable>();
   
    public MyExtendThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        workBlockingQueue.add((GtdataBreakpointResumeDownloadThread)r);//保留在运行的义务
        logger.info("Before the task execution");
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        workBlockingQueue.remove((GtdataBreakpointResumeDownloadThread)r);//移除关闭的义务
        logger.info("After the task execution");
    }
    /**
    *
    * Description: 正在运行的义务
    * @return LinkedBlockingQueue<Runnable><br>
    * @author lishun
    */
    public LinkedBlockingQueue<Runnable> getWorkBlockingQueue() {
        return workBlockingQueue;
    }
}

MyExtendThreadPoolExecutor pool = new MyExtendThreadPoolExecutor(3, 3,60L,TimeUnit.SECONDS,new LinkedBlockingQueue <Runnable>()); //建立线程池

【关于云返利网】

云返利网是阿里云、腾讯云、华为云产品推广返利平台,在各个品牌云产品官网优惠活动之外,云返利网还提供返利。您可以无门槛获得阿里云、华为云、腾讯云所有产品返利,在官网下单后就可以领取,无论是自己用、公司用还是帮客户采购,您个人都可以获得返利。云返利网的目标是让返利更多、更快、更简单!详情咨询13121395187(微信同号)