并行计算框架的Java实现--系列三
优化锁,之前的锁是采用一个static的Object实现的,这要就会有一个问题,如果我创建了多个Executer,那么所有Job都会持有一把锁,既影响性能,也容易出现死锁的情况。所以,改成每个Executer持有一把锁。Executer代码如下:
Java代码
public class Executer {
//计算已经派发的任务数(条件谓词)
public static int THREAD_COUNT = 0;
//存储任务的执行结果
private List<Future<Object>> futres = new ArrayList<Future<Object>>();
//条件队列锁
public final Object lock = new Object();
//线程池
private ExecutorService pool = null;
public Executer() {
this(1);
}
public Executer(int threadPoolSize) {
pool = Executors.newFixedThreadPool(threadPoolSize);
}
/**
* 任务派发
* @param job
*/
public void fork(Job job){
//设置同步锁
job.setLock(lock);
//将任务派发给线程池去执行
futres.add(pool.submit(job));
//增加线程数
synchronized (lock) {
THREAD_COUNT++;
}
}
/**
* 统计任务结果
*/
public List<Object> join(){
synchronized (lock) {
while(THREAD_COUNT > 0){//检查线程数,如果为0,则表示所有任务处理完成
System.out.println("threadCount: "+THREAD_COUNT);
try {
lock.wait();//如果任务没有全部完成,则挂起。等待完成的任务给予通知
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
List<Object> list = new ArrayList<Object>();
//取出每个任务的处理结果,汇总后返回
for (Future<Object> future : futres) {
try {
Object result = future.get();//因为任务都已经完成,这里直接get
list.add(result);
} catch (Exception e) {
e.printStackTrace();
}
}
return list;
}
}
Job类:
Java代码
public abstract class Job implements Callable<Object> {
//锁
private Object lock = null;
void setLock(Object lock) {
this.lock = lock;
}
@Override
public Object call() throws Exception {
Object result = this.execute();//执行子类具体任务
synchronized (lock) {
//处理完业务后,任务结束,递减线程数,同时唤醒主线程
Executer.THREAD_COUNT--;
lock.notifyAll();
}
return result;
}
/**
* 业务处理函数
*/
public abstract Object execute();
}
测试结果:
Java代码
threadCount: 10
running thread id = 8
running thread id = 10
running thread id = 9
running thread id = 12
running thread id = 11
threadCount: 8
threadCount: 7
threadCount: 6
threadCount: 5
running thread id = 12
running thread id = 8
running thread id = 11
threadCount: 2
running thread id = 10
threadCount: 1
running thread id = 9
ResultSize: 10
time: 2001
补充:软件开发 , Java ,