当前位置:编程学习 > JAVA >>

java多线程处理数据集

怎么用多线程来处理一个数据集。
需要实现的功能是读取500万条记录,对这些记录做一些处理之后将新数据插入到另外一个表中。
因为数据量太大了,处理的时候又要调函数会浪费一些时间,累积起来就是几十分钟
所以我想问问大家有没有什么办法用上多线程。
我做了两个尝试:
1、在run里面建立连接,并处理,最后插入新表都一步完成
出现这样的错误:com.microsoft.sqlserver.jdbc.SQLServerException: 无法打开登录所请求的数据库 "flickr"。登录失败。
也就是访问冲突了,结果就是不管开多少个线程实际也就只有一个在跑
2、把数据集读取出来,传给一个新的实现线程的类中处理
但是,一到这个地方遍历数据集的时候while(this.rs.next())就会提示数据集对象rs处空指针
希望大家能讨论下我的问题所在,还有如何使用多线程操作数据集
多线程 java 数据库 --------------------编程问答-------------------- 1.你的获取连接的方式看下,如果你每条数据都拿连接的话,最终会导致连接数耗尽,最终无法连接数据库。
2.没有代码没法给你看空指针的地方,但是肯定是你的参数传递有问题。既然你已经都取出数据了,为什么还要用rs.next。明显你只是拿到了连接,还没有取到数据。
我觉得你的思路有点问题,既然你要用多线程处理这批数据,那么你的每一批数据处理是否独立,换句话说,你处理的500w条数据,业务上是不是每次处理1w条也可以。如果是这样,那么你做个数据读取的分页控制,就能搞定。
如果你的数据是500w一同处理,那么你不得不分多次全部加载到内存中,再业务处理入库。
--------------------编程问答-------------------- 第一种方案的代码
//初始化det和conn Flickr
public static boolean isInit=false;
class TestImportx3 implements Runnable{
long starttime=0;
long endtime=0;
long rowNum=0;//总行数
int  columnNum=0;//总列数
//content表中各字段
int id=0;//记录到word表中的每条记录的ID值
String title = null;
String Tag= null;
String Author= null;
String Time= null;
String Introduction= null;
//word表中的个字段
int RecordId = 0;
String Word;
String Language;
int wordLen = 0;
String PageUrl= null;
PreparedStatement ppst = null ;//用来构建content插入语句
PreparedStatement ppstwd = null ;//用来构建word插入语句
int yinhaoerror = 0;
double fishpesnt;//运行时完成百分比
String tags="";
LangDetectSample det = new LangDetectSample();
String selall="";
ConnectSQLserver mssq=null;
     public void run(){
    
      if(isInit==false){//没初始化才进行初始化操作
      try {
     //SQL server
    mssq=new ConnectSQLserver(true);
det.init("lib/profiles");
isInit=true;
} catch (LangDetectException e1) {
e1.printStackTrace();
}
      }
      
      String inwdSQL="";
inwdSQL="insert into Word(RecordId,Word,Language,wordLen)values(?,?,?,?);";
int startId=0,endId=1;
 
//MYSQL
ConnectMySQL mysq=new ConnectMySQL(true);
ConnectMySQL mysqwd=new ConnectMySQL(true);//insert word
ResultSet rs = null;

try {
String inSQL="insert into Content(" +
"id,engnum,chnum,twnum,janum" +
",title,tag,author,time" +
",introduction,views," +
"PageUrl,H,lannum,tagnum)" +
"values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
starttime=System.currentTimeMillis();//导入开始时间

rowNum=2000000;
System.out.println("共有"+rowNum+"行");  
//初始化ppst
ppst=mysq.conn.prepareStatement(inSQL);
ppstwd=mysqwd.conn.prepareStatement(inwdSQL);
//取消自动提交,此时批处理生效 
mysq.conn.setAutoCommit(false); // 取消自动提交  
//每条记录的tag
starttime=System.currentTimeMillis();
//没到最后就按一定增量
ResultSet res = null; 
while((100000*startId)<rowNum){//不能一次读出所有记录数,所以循环多次,每次读10W条
  //读完后,重新得到数据集
//建立新 的RS
selall="select * from Content where id>"+100000*startId+++ " and id <"+100000*endId++;
//每次读MSSQL的SQL
rs=mssq.getSta().executeQuery(selall);
//XXX 不行啊 ???2013年8月24日10:40:20
//在这个地方调用多线程处理数据集
res=rs;
Control.run(mysq, rs, starttime, rowNum, ppst, det);//用上面的多线程就不用这个方法了
}//分量读取循环
endtime=System.currentTimeMillis();
fishpesnt=test/rowNum;
System.out.println("总读取条数:"+line+"条");
System.out.println("总插入条数:"+test+"条"+fishpesnt);
System.out.println("总用时:"+(endtime-starttime)/1000+"s");
ConnectMySQL mysq2=new ConnectMySQL(true);
mysq2.sta.execute("insert into report (topic,runtime,readline,insertline)values(" +
"\'导入SQLserver中的flickr\'"+"," +
(endtime-starttime)/1000+"," +
line+"," +
test+")");
mssq.closeStatement();
mssq.closeConnection();
} catch (SQLException e) {
e.printStackTrace();
}

}
}
--------------------编程问答--------------------
引用 1 楼 zhuweisyyc 的回复:
1.你的获取连接的方式看下,如果你每条数据都拿连接的话,最终会导致连接数耗尽,最终无法连接数据库。
2.没有代码没法给你看空指针的地方,但是肯定是你的参数传递有问题。既然你已经都取出数据了,为什么还要用rs.next。明显你只是拿到了连接,还没有取到数据。
我觉得你的思路有点问题,既然你要用多线程处理这批数据,那么你的每一批数据处理是否独立,换句话说,你处理的500w条数据,业务上是不是每次处理1w条也可以。如果是这样,那么你做个数据读取的分页控制,就能搞定。
如果你的数据是500w一同处理,那么你不得不分多次全部加载到内存中,再业务处理入库。

1、我这个地方用全局变量控制只连接一次数据库
 if(isInit==false){//没初始化才进行初始化操作
      try {
     //SQL server
    mssq=new ConnectSQLserver(true);
det.init("lib/profiles");
isInit=true;
} catch (LangDetectException e1) {
e1.printStackTrace();
}
      }

2、这里用了分页,每页10万条
 while((100000*startId)<rowNum){//不能一次读出所有记录数,所以循环多次,每次读10W条
                          //读完后,重新得到数据集
            //建立新 的RS
            selall="select * from Content where id>"+100000*startId+++ " and id <"+100000*endId++;
            //每次读MSSQL的SQL
            rs=mssq.getSta().executeQuery(selall);
            //XXX 不行啊 ???2013年8月24日10:40:20
            //在这个地方调用多线程处理数据集
            res=rs;
            Control.run(mysq, rs, starttime, rowNum, ppst, det);//用上面的多线程就不用这个方法了
        }//分量读取循环

3、创建连接的地方我封装了具体是这样的
 mssq=new ConnectSQLserver(true);

/**构造时是否初始化*/
public ConnectSQLserver(boolean init) {
if(init)
init();
}
//驱动程序名
private String driver="com.microsoft.sqlserver.jdbc.SQLServerDriver";
//数据库用户名
private String user="sa";
//密码
private String password="123";
//数据库名
private String databaseName="flickr";
//联结字符串
String urlStr ="jdbc:sqlserver://localhost:1433;DatabaseName="; 
private Connection conn =null;
private Statement sta = null;  
/**初始化函数2--默认链接flickr:仅仅注册+连接+声明默认数据库*/
public void init(){
forName();
connection();
statement();
}
/**注册驱动*/
public void forName()
{
try{           
    Class.forName(driver).newInstance(); 
    System.out.println("注册驱动成功");
    }catch(Exception e){  
       e.printStackTrace();  
   }  
}
/**建立连接1--默认flickr*/
public Connection connection()
{
urlStr =urlStr +getDatabaseName(); 
try {
conn = DriverManager.getConnection(urlStr,user,password);
 System.out.println("数据库" +getDatabaseName()+"连接成功");
} catch (SQLException e) {
e.printStackTrace();
}
return conn;  
}
/**建立声明1*/
public Statement statement()
{
try {
setSta(conn.createStatement());//发送要执行的SQL语句到数据库
System.out.println("建立声明成功");
} catch (SQLException e) {
e.printStackTrace();
  }
return getSta();  
}
--------------------编程问答-------------------- 你的startid在while中没有变化,那不是跳不出while循环了,无限连接数据库。
while((100000*startId)<rowNum){  --------------------编程问答-------------------- 这么多数据读到中间层处理太慢了。 直接用存储过程吧。 

光读取数据到中间层的时间都够存储过程执行4,5遍了。 --------------------编程问答--------------------
引用 4 楼 zhuweisyyc 的回复:
你的startid在while中没有变化,那不是跳不出while循环了,无限连接数据库。
while((100000*startId)<rowNum){ 

selall="select * from Content where id>"+100000*startId+++ " and id <"+100000*endId++;
变了啊。
这个不是问题,分页我只是可以用的,现在就想加上多线程。希望给我点多线程这方面的帮助
            --------------------编程问答--------------------
引用 5 楼 rainbowsix 的回复:
这么多数据读到中间层处理太慢了。 直接用存储过程吧。 

光读取数据到中间层的时间都够存储过程执行4,5遍了。

用存储过程的话,我就不能对数据进行语种判断的处理了。我读出的词语要进行语种判断,在把判断结果写到另外的表中。用存储过程应该不可以可以调用中间层的语言判断的方法吧 --------------------编程问答-------------------- 我没有明白为什么限制只有一个数据库链接?
用线程没错,你写一个数据库连接池,比如10个,那你就用10个线程去做业务 --------------------编程问答--------------------
引用 7 楼 zhang__tianxu 的回复:
Quote: 引用 5 楼 rainbowsix 的回复:

这么多数据读到中间层处理太慢了。 直接用存储过程吧。 

光读取数据到中间层的时间都够存储过程执行4,5遍了。

用存储过程的话,我就不能对数据进行语种判断的处理了。我读出的词语要进行语种判断,在把判断结果写到另外的表中。用存储过程应该不可以可以调用中间层的语言判断的方法吧


语种? 听起来貌似很复杂,怎么不能对语种判断? 你们现在是怎么对语种判断的?
补充:Java ,  Java相关
CopyRight © 2012 站长网 编程知识问答 www.zzzyk.com All Rights Reserved
部份技术文章来自网络,