并行计算框架的Java实现--系列一
最近的工作需要统计一些复杂的报表,为了提高效率,想用多线程去实现,但要在所有线程完成统计任务后,将结果汇总。所以在思考有没有什么办法解决,之所以是“系列一”是因为我想记录下我的思考过程。1、首先设计一个Executer,负责任务的执行和汇总:
Java代码
public class Executer {
//计算已经派发的任务数(条件谓词)
public static int THREAD_COUNT = 0;
//线程池
private Executor pool = null;
public Executer() {
this(1);
}
public Executer(int threadPoolSize) {
pool = Executors.newFixedThreadPool(threadPoolSize);
}
/**
* 任务派发
* @param job
*/
public void fork(Job job){
//将任务派发给线程池去执行
pool.execute(job);
THREAD_COUNT++;
}
/**
* 统计任务结果
*/
public void join(){
while(THREAD_COUNT > 0){
System.out.println("threadCount: "+THREAD_COUNT);
try {
wait();//如果任务没有全部完成,则挂起
} catch (Exception e) {}//这里总是抛异常,不知道为什么,好吧!先不管它
}
}
}
2、写一个抽象的Job类,负责执行具体的任务
Java代码
public abstract class Job implements Runnable {
@Override
public void run() {
this.execute();//执行子类具体任务
Executer.THREAD_COUNT--;
try{
notifyAll();//这里总是抛异常,不知道为什么,好吧!先不管它
}catch(Exception e){}
}
/**
* 业务处理函数
*/
public abstract void execute();
}
3、测试,先来一个具体的任务实现。
Java代码
public class MyJob extends Job {
@Override
public void execute() {
//模拟业务需要处理1秒.
try {Thread.sleep(1000);} catch (InterruptedException e) {}
System.out.println("running thread id = "+Thread.currentThread().getId());
}
}
4、测试。
Java代码
public class Test {
public static void main(String[] args) {
//初始化任务池
Executer exe = new Executer(5);
//初始化任务
long time = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
MyJob job = new MyJob();
exe.fork(job);//派发任务
}
//汇总任务结果
exe.join();
System.out.println("time: "+(System.currentTimeMillis() - time));
}
}
5、好吧,看一下结果
Java代码
threadCount: 10
......(表示有N多个)
threadCount: 10
running thread id = 8
running thread id = 9
running thread id = 11
running thread id = 10
running thread id = 12
threadCount: 5
......(表示有N多个)
threadCount: 5
running thread id = 9
running thread id = 10
running thread id = 12
running thread id = 8
running thread id = 11
threadCount: 3
time: 2032
哈哈,看来是可以了,最后汇总任务的处理时间是2032毫秒,看来是比单个任务顺序执行来的快。但是有几个问题:
1)如果没有catch那个超级Exception的话,就会抛下面的异常:
Java代码
java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:485)
at com.one.Executer.join(Executer.java:38)
at com.test.Test.main(Test.java:21)
2)为啥会打印N多个同样值threadCount呢?
于是和同事(河东)沟通,他说wait要放在synchronized里面才行,好吧,试一下,改进一下Executer和Job
Java代码
public class Executer {
//计算已经派发的任务数(条件谓词)
public static int THREAD_COUNT = 0;
//条件队列锁
public static final Object LOCK = new Object();
//线程池
private Executor pool = null;
public Executer() {
this(1);
}
public Executer(int threadPoolSize) {
pool = Executors.newFixedThreadPool(threadPoolSize);
}
/**
* 任务派发
* @param job
*/
public void fork(Job job){
//将任务派发给线程池去执行
pool.execute(job);
//增加线程数
synchronized (LOCK) {
THREAD_COUNT++;
}
}
/**
* 统计任务结果
*/
public void join(){
synchronized (LOC
补充:软件开发 , Java ,