当前位置:编程学习 > JAVA >>

Storm测试程序,没有输出!

hi,各位,
下面是一个简单的storm的测试程序(改写自storm-starter上的例子程序)。

Spout类:

package storm.starter.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import java.util.Random;

import storm.starter.hdfs.HdfsUtils;

public class RandomSentenceSpout extends BaseRichSpout {
  SpoutOutputCollector _collector;
  Random _rand;

  private FileReader fileReader;
  private boolean completed = false;

  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    _collector = collector;
    _rand = new Random();
    try {
     String logFilePath = conf.get("logFile").toString();  // 获取数据源文件路径
     System.out.println("log_file_path:" + logFilePath);
this.fileReader = new FileReader(logFilePath);

} catch (FileNotFoundException e) {
e.printStackTrace();
}  
  }

  @Override
  public void nextTuple() {
    Utils.sleep(100);
    
    if(completed){  
     try {  
     Thread.sleep(1000);  
     } catch (InterruptedException e) {  
     //Do nothing  
     }  
     return;  
     }  
     String str;  
     //Open the reader  
     BufferedReader reader = new BufferedReader(fileReader);  
     try{  
     //Read all lines  
     while((str = reader.readLine())!= null){  
     /** 
     * By each line emmit a new value with the line as a their 
     */  
    
     System.out.println("Spout Emit:" + str);
     //this._collector.emit(new Values(str),str);  
     this._collector.emit(new Values(str)); // 读文件,并发送
     }  
    } catch(Exception e){  
     throw new RuntimeException("Errorreading tuple",e);  
    }finally{  
     completed = true;  
     }
  }

  @Override
  public void ack(Object id) {
  }

  @Override
  public void fail(Object id) {
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }

}


下面是Bolt类,和main函数

package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.ShellBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.starter.hdfs.HdfsUtils;
import storm.starter.spout.RandomSentenceSpout;

import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * This topology demonstrates Storm's stream groupings and multilang capabilities.
 */
public class WordCountTopology {

  //public static class SplitSentence extends ShellBolt implements IRichBolt {
  public static class SplitSentence implements IRichBolt {

  private OutputCollector collector;
  private FileWriter fileWriter;
  
    String redis_config_path = "";
    
    public SplitSentence() {
    }
    
    public void prepare(Map stormConf, TopologyContext context,  
     OutputCollector collector) {  
     this.collector = collector;  
    
     redis_config_path = stormConf.get("redisCfg").toString();
     try {
this.fileWriter = new FileWriter(stormConf.get("localOutputPath").toString()); // 获取输出文件路径
} catch (IOException e) {
e.printStackTrace();
}
    }
    
    public void execute(Tuple input) {  
     String sentence = input.getString(0);  
     String[] words = sentence.split("\t");  
     for(String word : words){  
     word = word.trim();  
     if(!word.isEmpty()){  
     word = word.toLowerCase();  
     //Emit the word  
     List a = new ArrayList();  
     a.add(input);  
    
     System.out.println(redis_config_path + " - cur word:" + word);
     try {
fileWriter.write(word + "\n"); // 写入输出文件
} catch (IOException e) {
e.printStackTrace();
}
    
     collector.emit(a,new Values(word));  
     }  
     }  
     // Acknowledge the tuple  
     collector.ack(input);  
    }  

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }

@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}

@Override
public void cleanup() {
// TODO Auto-generated method stub
try {
fileWriter.flush();
fileWriter.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
  }

  public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new RandomSentenceSpout(), 1);

    builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");

    
    for (int i = 0; i < args.length; ++i) {
     System.out.println("arg " + i + ":" + args[i]);
    }
    
    
    String topName = args[0];
    String runMode = args[1];
    String logFilePath = args[2];
    String readRedisConfig = args[3];
    String localOutputPath = args[4];
    
    
    Config conf = new Config();
    conf.put("logFile", logFilePath);
    conf.put("redisCfg", readRedisConfig);
    conf.put("localOutputPath", localOutputPath);
    conf.setDebug(true);
    

    System.out.println("Before Start...");

    //if (args != null && args.length > 0) {
    if (runMode.equalsIgnoreCase("srv")) { // 集群运行模式
    
     System.out.println("Server Mode...");
    
      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    else if (runMode.equalsIgnoreCase("loc")) { // 单机运行模式
    
     System.out.println("Local Mode...");
    
      conf.setMaxTaskParallelism(3);

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());

      Thread.sleep(10000);

      cluster.shutdown();
    } else {
     System.out.println("Unknown Mode:" + runMode);
    }
  }
}


将上述代码,进行maven打包后,得到jar包:
storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar

本地模式运行命令:
storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology topName loc /root/StormTests/logs.txt redis_path /root/StormTests/out.log

集群模式运行命令:
storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology topName srv /root/StormTests/logs.txt redis_path /root/StormTests/out.log



现在的问题是:
本地模式运行成功,并得到输出文件;
但是集群模式运行后,得不到输出文件,也没有打印相关的调试信息(System.out.println(...))。

通过storm list命令,查看这些集群模式的job处于"ACTIVE"状态。

请教,该如何让它们正常的进行集群模式运行啊? --------------------编程问答-------------------- QTP   --------------------编程问答-------------------- 什么意思啊?
补充:Java ,  Java EE
CopyRight © 2012 站长网 编程知识问答 www.zzzyk.com All Rights Reserved
部份技术文章来自网络,