ThreadPoolExecutor 的三种提交任务方式

ThreadPoolExecutor 的三种提交任务方式

学习内容:

ExecutorService线程池的应用…

1.如何创建线程池…

2.调用线程池的方法,获取线程执行完毕后的结果…

3.关闭线程…

 

  首先我们先了解一下到底什么是线程池,只有了解了其中的道理,我们才能够进行应用…java.util.concurrent.ExecutorService表述了异步执行的机制

  首先我们简单的举一个例子…

复制代码
复制代码
package executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Executor {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub

        System.out.println("cc");
        ExecutorService executorService=Executors.newFixedThreadPool(10);
        executorService.execute(new Runnable() {
            
            @Override
            public void run() {
                // TODO Auto-generated method stub
                while(true){
                    
                    System.out.println("aa");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
        System.out.println("bb");
    }

}
复制代码
复制代码

  这里我们指定了十个线程处于一个线程池内部,线程池的原理其实就是对多线程的一个管理,为了实现异步机制的一种方法,其实就是多个线程执行多个任务,*终这些线程通过线程池进行管理…不用手动去维护…一次可以处理多个任务,这样就可以迅速的进行相应…比如说一个网站成为了热点网站,那么对于大量的点击量,就必须要对每一次的点击做出迅速的处理,这样才能达到更好的交互效果…这样就需要多个线程去处理这些请求,以便能够更好的提供服务…

1. 简单的说一下如何创建线程池进行初始化….创建线程有几种常用方式…这里都是使用了Executors工厂来实例化对象,同时我们也可以根据需求自己去写一个ExecutorService…这几种常用的方法有一定的区别…

ExecutorService executorService1 = Executors.newSingleThreadExecutor();

ExecutorService executorService2 = Executors.newFixedThreadPool(10);

ExecutorService executorService3 = Executors.newScheduledThreadPool(10);
ExecutorService executorService4 = Executors.newCacheThreadPool();
Executors.newSingleThreadExecutor()
单例线程,表示在任意的时间段内,线程池中只有一个线程在工作…
Executors.newCacheThreadPool()
缓存线程池,先查看线程池中是否有当前执行线程的缓存,如果有就resue(复用),如果没有,那么需要创建一个线程来完成当前的调用.并且这类线程池只能完成一些生存期很短的一些任务.并且这类线程池内部规定能resue(复用)的线程,空闲的时间不能超过60s,一旦超过了60s,就会被移出线程池.
 Executors.newFixedThreadPool(10) 固定型线程池,和newCacheThreadPool()差不多,也能够实现resue(复用),但是这个池子规定了线程的*大数量,也就是说当池子有空闲时,那么新的任务将会在空闲线程中被执行,一旦线程池内的线程都在进行工作,那么新的任务就必须等待线程池有空闲的时候才能够进入线程池,其他的任务继续排队等待.这类池子没有规定其空闲的时间到底有多长.这一类的池子更适用于服务器.
 Executors.newScheduledThreadPool(10) 调度型线程池,调度型线程池会根据Scheduled(任务列表)进行延迟执行,或者是进行周期性的执行.适用于一些周期性的工作.

 

 

 

 

 

 

 

 

 

 

这就是线程池创建的几种方式…我们需要根据不同的需求来适当的选择到底使用哪种线程池…

2.那么创建了线程池以后就需要对线程池进行调用..将任务加载到其中…

i.ExecutorService.execute(Runnable);

  *种调用方式…通过这种方式将线程任务加载到线程池当中…我们可以添加多个任务…贴上一个完整的代码…大家看一下代码的解释就明白到底是怎么回事了..不难理解…

复制代码
复制代码
package executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Executor {

    /**
     * @param args
     * 
     */
 
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ExecutorService executorService=Executors.newFixedThreadPool(2);//定义了线程池中*大存在的线程数目...
        
        //添加了*个任务...这个任务会一直被执行...
        executorService.execute(new Runnable() {
            
            @Override
            public void run() {
                // TODO Auto-generated method stub
                while(true){
                    
                    System.out.println("aa");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
        
        //添加第二个任务,被执行三次停止...
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                // TODO Auto-generated method stub
                int i=0;
                while(true){
                    i++;
                    System.out.println("bb");
                    if(i==3){
                        break;
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }    
                }
            }
        });
        
        /*
         * @param
         * 第三个任务...只有当第二个任务被执行三次之后才能被执行...
         * 由于三次前,线程池已经满了,这个任务是轮不到被执行的..只能排队进行等待. 
         * 三次之后,第二个任务被终止,也就是线程池中出现了空闲的状态,所以这个任务将被放入到线程池中执行...
         * */
        executorService.execute(new Runnable() {
            
            @Override
            public void run() {
                // TODO Auto-generated method stub
                while(true){
                    
                    System.out.println("cc");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
    }

}
复制代码
复制代码

  ii.executorService.submit(Runnable) 第二种调用方式…这种方式与*种的区别在于可以使用一个Future对象来判断当前的线程是否执行完毕…但是这种方法只能判断当前的线程是否执行完毕,无法返回数据信息…

复制代码
复制代码
Future future = executorService.submit(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});
//如果任务结束执行则返回 null
System.out.println("future.get()=" + future.get());
复制代码
复制代码

  iii.executorService.submit(Callable)…  第三种调用方式…这种调用方式与前一种有所不同,传递的参数为Callable对象,Callable与Runnbale很相似,但是Callable的call()方法可以返回数据信息…通过Future就能够获取到其中的信息..而Runnbale.run()方法时无法获取数据信息的….Future应用于多线程…可以获取call()方法返回的数据信息…其实他是一种模式,是为了性能优化而提供的一种思想…这里我就不说Future…

复制代码
复制代码
uture future = executorService.submit(new Callable(){
    public Object call() throws Exception {
        System.out.println("Asynchronous Callable");
        return "Callable Result";
    }
});

System.out.println("future.get() = " + future.get());

//上述样例代码会输出如下结果: 
//Asynchronous Callable
//future.get() = Callable Result
复制代码
复制代码

 iv.inVokeAny()…第四种调用方式…方法 invokeAny() 接收一个包含 Callable 对象的集合作为参数。调用该方法不会返回 Future 对象,而是返回集合中某一个 Callable 对象的结果,而且无法保证调用之后返回的结果是哪一个Callable,只知道它是这些 Callable 中一个执行结束的 Callable 对象…说实话这个方法我不知道它创建的目的到底是什么…这里执行后的结果是随机的…也就是输出是不固定的….

复制代码
复制代码
ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});

String result = executorService.invokeAny(callables);

System.out.println("result = " + result);
复制代码
复制代码

  v.inVokeAll()这个方法和上面不同的地方就在于它可以返回所有Callable的执行结果…获取到所有的执行结果,我们可以对其进行管理…相对而言,我觉得这个方法比上一个更实用吧…

复制代码
复制代码
ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});

List<Future<String>> futures = executorService.invokeAll(callables);

for(Future<String> future : futures){
    System.out.println("future.get = " + future.get());
复制代码
复制代码

3.线程池的关闭…

当我们不需要使用线程池的时候,我们需要对其进行关闭…有两种方法可以关闭掉线程池…

i.shutdown()…

  shutdown并不是直接关闭线程池,而是不再接受新的任务…如果线程池内有任务,那么把这些任务执行完毕后,关闭线程池….

ii.shutdownNow()

  这个方法表示不再接受新的任务,并把任务队列中的任务直接移出掉,如果有正在执行的,尝试进行停止…

大家自己试着运行下面的代码就了解其中到底是怎么回事了…

 

复制代码
复制代码
package executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Executor {

    /**
     * @param args
     * 
     */
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ExecutorService executorService=Executors.newFixedThreadPool(1);//定义了线程池中*大存在的线程数目...
        
        //添加了*个任务...这个执行三次停止...
        executorService.execute(new Runnable() {
            
            @Override
            public void run() {
                // TODO Auto-generated method stub
                int j=0;
                while(true){
                    j++;
                    System.out.println("aa");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    if(j==3){
                        break;
                    }
                }
            }
        });
        
        //添加第二个任务,由于使用executorService.shutdown(),由于它的加入是在这个方法调用之前的,因此这个任务也会被执行...
        //如果我们使用了executorService.shutdownNow();方法,就算是他在之前加入的,由于调用了executorService.shutdownNow()方法
        //那么这个任务将直接被移出队列并且不会被执行...
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                // TODO Auto-generated method stub
                int i=0;
                while(true){
                    i++;
                    System.out.println("bb");
                    if(i==3){
                        break;
                    }
                }
            }
        });
        executorService.shutdown();//这里无论使用了那种方法,都会抛出一个异常...
        /*
         * @param
         * 第三个任务...只有当第二个任务被执行三次之后才能被执行...
         * 由于三次前,线程池已经满了,这个任务是轮不到被执行的..只能排队进行等待. 
         * 三次之后,第二个任务被终止,也就是线程池中出现了空闲的状态,所以这个任务将被放入到线程池中执行...
         * */
        executorService.execute(new Runnable() {
            
            @Override
            public void run() {
                // TODO Auto-generated method stub
                while(true){
                    
                    System.out.println("cc");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
    }

}
复制代码
复制代码

Android中的线程池 ThreadPoolExecutor

Android中的线程池 ThreadPoolExecutor

线程池的优点:

  1. 重用线程池中的线程,避免因为线程的创建和销毁带来的性能消耗
  2. 能有效的控制线程的*大并发数,避免大量的线程之间因抢占系统资源而导致的阻塞现象
  3. 能够对线程进行简单的管理,并提供定时执行以及指定间隔循环执行等功能

ThreadPoolExecutor:

Android中,用ThreadPoolExecutor来实现线程池的配置。
ThreadPoolExecutor文档中文版
ThreadPoolExecutor文档英文版

%title插图%num
QQ截图20160711141312.png

ThreadPoolExecutor的构造方法

ThreadPoolExecutor的构造方法有四个,其实现如下:
“`
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                          TimeUnit unit, BlockingQueue<Runnable> workQueue, 
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                          TimeUnit unit, BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
                          TimeUnit unit,BlockingQueue<Runnable> workQueue, 
                          ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
```
构造方法的参数
  • corePoolSize
    程池中的核心线程数,也就是是线程池中的*小线程数;
    核心线程在allowCoreThreadTimeout被设置为true时会超时退出,默认情况下不会退出;
  • maximumPoolSize
    *大线程池大小,当活动线程数达到这个值,后续任务会被阻塞
  • keepAliveTime
    线程池中超过corePoolSize数目的非核心线程*大存活时间;闲置时的超时时长,超过这个值后,闲置线程就会被回收
  • unit
    keepAliveTime 参数的时间单位。这是一个枚举,详情请参考TimeUnit
  • workQueue
    执行前用于保持任务的队列,也就是线程池的缓存队列。此队列仅保持由 execute 方法提交的 Runnable 任务
    关于三种提交策略这篇文章不错
  • threadFactory
    线程工厂,为线程池提供创建新线程的功能,它是一个接口,只有一个方法:Thread newThread(Runnable r)
  • RejectedExecutionHandler
    线程池对拒*任务的处理策略。一般是队列已满或者无法成功执行任务,这时ThreadPoolExecutor会调用handler的rejectedExecution方法来通知调用者
    ThreadPoolExecutor默认有四个拒*策略:

      1、ThreadPoolExecutor.AbortPolicy()   直接抛出异常RejectedExecutionException
      2、ThreadPoolExecutor.CallerRunsPolicy()    直接调用run方法并且阻塞执行
      3、ThreadPoolExecutor.DiscardPolicy()   直接丢弃后来的任务
      4、ThreadPoolExecutor.DiscardOldestPolicy()  丢弃在队列中队首的任务
    

也可以自己继承RejectedExecutionHandler来写拒*策略.

ThreadPoolExecutor的执行过程:

一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是Runnable类型对象的run()方法。

  1. 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程
  2. 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
  3. 当提交任务数超过【maximumPoolSize+阻塞队列大小】时,新提交任务由RejectedExecutionHandler处理 (关于这里,网上90%以上的人说当任务数>=maximumPoolSize时就会被拒*,我不知道依据在哪里,也不知道代码验证过没,经过我的验证这种说法是不成立的,具体的看下边日志分析)
  4. 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
  5. 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
定制自己的线程池:
public class ThreadTestActivity extends AppCompatActivity {
    private final int CORE_POOL_SIZE = 1;//核心线程数
    private final int MAX_POOL_SIZE = 3;//*大线程数
    private final int BLOCK_SIZE = 2;//阻塞队列大小
    private final long KEEP_ALIVE_TIME = 2;//空闲线程超时时间
    private ThreadPoolExecutor executorPool;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_thread_test);
        //创建线程池
        // 创建一个核心线程数为3、*大线程数为8,缓存队列大小为5的线程池
        executorPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(BLOCK_SIZE),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        executorPool.allowCoreThreadTimeOut(true);
    }

    public void begin(View view) {
        for (int num = 0; num < 6; num++) {//每个500ms添加一个任务到队列中
            try {
                Li("execute");// 监听相关数据
                executorPool.execute(new WorkerThread("thread-" + num));
            } catch (Exception e) {
                Log.e("threadtest", "AbortPolicy...");
            }
        }

        // 20s后,所有任务已经执行完毕,我们在监听一下相关数据
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(20 * 1000);
                } catch (Exception e) {

                }
                Li("monitor after");
            }
        }).start();
    }

    private void Li(String mess) {
        Log.i("threadtest", "monitor " + mess
                        + " CorePoolSize:" + executorPool.getCorePoolSize()
                        + " PoolSize:" + executorPool.getPoolSize()
                        + " MaximumPoolSize:" + executorPool.getMaximumPoolSize()
                        + " ActiveCount:" + executorPool.getActiveCount()
                        + " TaskCount:" + executorPool.getTaskCount()

        );
    }

}

// 模拟耗时任务
public class WorkerThread implements Runnable {
    private String threadName;
    public WorkerThread(String threadName) {
        this.threadName = threadName;
    }
    @Override
    public synchronized void run() {

        int i = 0;
        boolean flag = true;
        try {
            while (flag) {
                Thread.sleep(1000);
                i++;
                Log.e("threadtest", "WorkerThread " + threadName + "  " + i);
                if (i >2) flag = false;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public String getThreadName() {
        return threadName;
    }
}

日志信息:

%title插图%num
QQ截图20160712161004.png

下面就来对日志信息进行分析:

  1. 上边粉色部分(1~6行),可以看到poolsize逐渐累加,一直加到*大线程数后不再增加,这呼应了上述“执行过程1”
  2. 接下来绿色部分()7行,验证了上边我们说的“执行过程3”,缓存队列数为2,*大线程数为3,共有6条任务,所以会有【 6-(2+3)】条任务被拒*,这里拒*策略我们用的是ThreadPoolExecutor.AbortPolicy()也就是直接抛出异常,也就是我们日志的第7行
  3. 然后8~22行是任务的执行过程,
  4. 其中蓝色部分(8~16)行,我们可以看到有3条任务在同时执行,也就是*大线程数
  5. 接下来的绿色(17~22行),在三条任务执行完成后,剩余的排队任务才开始执行
  6. *后,23行,20s后,线程都处于空闲状态,所以非核心线程会被回收,但是因为代码中我们设置了executorPool.allowCoreThreadTimeOut(true),所以这时处于空闲状态的核心线程也会被回收,这时池中的线程数为0
关于线程池的一些建议
  • *大线程数一般设为2N+1*好,N是CPU核数

官方定义的四种线程池

其实,本应该先说官方定义的这四种线程池,然后再说自定义线程池,但是考虑到里边的一些配置参数,所以本帖先利用自定义线程池把各个配置参数理一下,然后再讲官方定义的四种线程池,这样也便于理解官方定义的这四种线程池
这四种线程池都是通过Executors的工厂方法来实现

1、FixedThreadPool

他是一种数量固定的线程池,且任务队列也没有大小限制;
它只有核心线程,且这里的核心线程也没有超时限制,所以即使线程处于空闲状态也不会被回收,除非线程池关闭;
当所有的任务处于活动状态,新任务都处于等待状态,知道所有线程空闲出来;
因为它不会被回收,所以它能更快的响应;
源码:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

实现:

    ExecutorService service = Executors.newFixedThreadPool(3);
    service.execute(new WorkerThread("thread-" + num));
2、CachedThreadPool

无界线程池,可以进行自动线程回收
他是一种线程数量不固定的线程池;
它只有非核心线程,且*大线程数为Integer.MAX_VALUE,也就是说线程数可以任意大;
当池中的线程都处于活动状态时,会创建新的线程来处理任务,否则会利用空闲线程来处理任务;所以,任何添加进来的任务都会被立即执行;
池中的空闲线程都有超时限制,为60s,超过这个限制就会被回收,当池中的所有线程都处于闲置状态时,都会因超时而被回收,这个时候,她几乎不占用任何系统资源;
适合做大量的耗时较少的任务;
源码:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

实现:

    ExecutorService service = Executors.newCachedThreadPool();
    service.execute(new WorkerThread("thread-"));
3、SingleThreadExecutor

只有一个核心线程,所有任务都在同一线程中按序执行,这样也就不需要处理线程同步的问题;
源码:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

实现:

    ExecutorService service = Executors.newSingleThreadExecutor();
    service.execute(new WorkerThread("thread-"));
4、ScheduledThreadPool

它的核心线程数量是固定的,而非核心线程是没有限制的,且非核心线程空闲时会被回收;
适合执行定时任务和具有固定周期的任务
源码:

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

实现:

    ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
    或
    ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();

    threadPool.schedule(runnable, 20, TimeUnit.SECONDS);// 20秒后执行任务
    或
    threadPool.scheduleAtFixedRate(runnable,10,20,TimeUnit.SECONDS);//延迟10s,每20s执行一次任务

由于本人技术有限,避免不了出现一些错误或者理解有偏差描述不清楚的地方,请大家谅解并提醒我:)