当前位置:编程学习 > JAVA >>

并行计算框架的Java实现--系列二

增加对结果的处理:
1、修改Job,实现Callable接口
Java代码 
public abstract class Job implements Callable<Object> { 
 
    @Override 
    public Object call() throws Exception { 
        Object result = this.execute();//执行子类具体任务 
        synchronized (Executer.LOCK) { 
            //处理完业务后,任务结束,递减线程数,同时唤醒主线程 
            Executer.THREAD_COUNT--; 
            Executer.LOCK.notifyAll(); 
        } 
        return result; 
    } 
    /**
     * 业务处理函数
     */ 
    public abstract Object execute(); 
 

 
2、修改Executer,增加对结果的处理
Java代码 
public class Executer { 
    //计算已经派发的任务数(条件谓词) 
    public static int THREAD_COUNT = 0; 
    //存储任务的执行结果 
    private List<Future<Object>> futres = new ArrayList<Future<Object>>();  
    //条件队列锁 
    public static final Object LOCK = new Object(); 
    //线程池 
    private ExecutorService pool = null; 
    public Executer() { 
        this(1); 
    } 
    public Executer(int threadPoolSize) { 
        pool = Executors.newFixedThreadPool(threadPoolSize); 
    } 
    /**
     * 任务派发
     * @param job
     */ 
    public void fork(Job job){ 
        //将任务派发给线程池去执行 
        futres.add(pool.submit(job)); 
        //增加线程数 
        synchronized (LOCK) { 
            THREAD_COUNT++; 
        } 
    } 
    /**
     * 统计任务结果
     */ 
    public List<Object> join(){ 
        synchronized (LOCK) { 
            while(THREAD_COUNT > 0){//检查线程数,如果为0,则表示所有任务处理完成 
                System.out.println("threadCount: "+THREAD_COUNT); 
                try { 
                    LOCK.wait();//如果任务没有全部完成,则挂起。等待完成的任务给予通知 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
            } 
        } 
        List<Object> list = new ArrayList<Object>(); 
        //取出每个任务的处理结果,汇总后返回 
        for (Future<Object> future : futres) { 
            try { 
                Object result = future.get();//因为任务都已经完成,这里直接get 
                list.add(result); 
            } catch (Exception e) { 
                e.printStackTrace(); 
            }  
        } 
        return list; 
    } 

 
 3、测试:
Java代码 
public static void main(String[] args) { 
        //初始化任务池 
        Executer exe = new Executer(5); 
        //初始化任务 
        long time = System.currentTimeMillis(); 
        for (int i = 0; i < 10; i++) { 
            MyJob job = new MyJob(); 
            exe.fork(job);//派发任务 
        }   www.zzzyk.com
        //汇总任务结果 
        List<Object> list = exe.join(); 
        System.out.println("Result: "+list); 
        System.out.println("time: "+(System.currentTimeMillis() - time)); 
    } 
 
4、执行结果:
Java代码 
threadCount: 10 
running thread id = 9 
running thread id = 11 
running thread id = 8 
running thread id = 10 
running thread id = 12 
threadCount: 5 
running thread id = 9 
running thread id = 8 
running thread id = 11 
running thread id = 12 
running thread id = 10 
Result: [8, 9, 10, 11, 12, 8, 11, 12, 9, 10] 
time: 2000 
补充:软件开发 , Java ,
CopyRight © 2022 站长资源库 编程知识问答 zzzyk.com All Rights Reserved
部分文章来自网络,