并行计算框架的Java实现--系列二
增加对结果的处理:1、修改Job,实现Callable接口
Java代码
public abstract class Job implements Callable<Object> {
@Override
public Object call() throws Exception {
Object result = this.execute();//执行子类具体任务
synchronized (Executer.LOCK) {
//处理完业务后,任务结束,递减线程数,同时唤醒主线程
Executer.THREAD_COUNT--;
Executer.LOCK.notifyAll();
}
return result;
}
/**
* 业务处理函数
*/
public abstract Object execute();
}
2、修改Executer,增加对结果的处理
Java代码
public class Executer {
//计算已经派发的任务数(条件谓词)
public static int THREAD_COUNT = 0;
//存储任务的执行结果
private List<Future<Object>> futres = new ArrayList<Future<Object>>();
//条件队列锁
public static final Object LOCK = new Object();
//线程池
private ExecutorService pool = null;
public Executer() {
this(1);
}
public Executer(int threadPoolSize) {
pool = Executors.newFixedThreadPool(threadPoolSize);
}
/**
* 任务派发
* @param job
*/
public void fork(Job job){
//将任务派发给线程池去执行
futres.add(pool.submit(job));
//增加线程数
synchronized (LOCK) {
THREAD_COUNT++;
}
}
/**
* 统计任务结果
*/
public List<Object> join(){
synchronized (LOCK) {
while(THREAD_COUNT > 0){//检查线程数,如果为0,则表示所有任务处理完成
System.out.println("threadCount: "+THREAD_COUNT);
try {
LOCK.wait();//如果任务没有全部完成,则挂起。等待完成的任务给予通知
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
List<Object> list = new ArrayList<Object>();
//取出每个任务的处理结果,汇总后返回
for (Future<Object> future : futres) {
try {
Object result = future.get();//因为任务都已经完成,这里直接get
list.add(result);
} catch (Exception e) {
e.printStackTrace();
}
}
return list;
}
}
3、测试:
Java代码
public static void main(String[] args) {
//初始化任务池
Executer exe = new Executer(5);
//初始化任务
long time = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
MyJob job = new MyJob();
exe.fork(job);//派发任务
} www.zzzyk.com
//汇总任务结果
List<Object> list = exe.join();
System.out.println("Result: "+list);
System.out.println("time: "+(System.currentTimeMillis() - time));
}
4、执行结果:
Java代码
threadCount: 10
running thread id = 9
running thread id = 11
running thread id = 8
running thread id = 10
running thread id = 12
threadCount: 5
running thread id = 9
running thread id = 8
running thread id = 11
running thread id = 12
running thread id = 10
Result: [8, 9, 10, 11, 12, 8, 11, 12, 9, 10]
time: 2000
补充:软件开发 , Java ,