Storm测试程序,没有输出!
hi,各位,下面是一个简单的storm的测试程序(改写自storm-starter上的例子程序)。
Spout类:
package storm.starter.spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import storm.starter.hdfs.HdfsUtils;
public class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;
private FileReader fileReader;
private boolean completed = false;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
try {
String logFilePath = conf.get("logFile").toString(); // 获取数据源文件路径
System.out.println("log_file_path:" + logFilePath);
this.fileReader = new FileReader(logFilePath);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
@Override
public void nextTuple() {
Utils.sleep(100);
if(completed){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//Do nothing
}
return;
}
String str;
//Open the reader
BufferedReader reader = new BufferedReader(fileReader);
try{
//Read all lines
while((str = reader.readLine())!= null){
/**
* By each line emmit a new value with the line as a their
*/
System.out.println("Spout Emit:" + str);
//this._collector.emit(new Values(str),str);
this._collector.emit(new Values(str)); // 读文件,并发送
}
} catch(Exception e){
throw new RuntimeException("Errorreading tuple",e);
}finally{
completed = true;
}
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
下面是Bolt类,和main函数
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.ShellBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.starter.hdfs.HdfsUtils;
import storm.starter.spout.RandomSentenceSpout;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This topology demonstrates Storm's stream groupings and multilang capabilities.
*/
public class WordCountTopology {
//public static class SplitSentence extends ShellBolt implements IRichBolt {
public static class SplitSentence implements IRichBolt {
private OutputCollector collector;
private FileWriter fileWriter;
String redis_config_path = "";
public SplitSentence() {
}
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
redis_config_path = stormConf.get("redisCfg").toString();
try {
this.fileWriter = new FileWriter(stormConf.get("localOutputPath").toString()); // 获取输出文件路径
} catch (IOException e) {
e.printStackTrace();
}
}
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split("\t");
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word = word.toLowerCase();
//Emit the word
List a = new ArrayList();
a.add(input);
System.out.println(redis_config_path + " - cur word:" + word);
try {
fileWriter.write(word + "\n"); // 写入输出文件
} catch (IOException e) {
e.printStackTrace();
}
collector.emit(a,new Values(word));
}
}
// Acknowledge the tuple
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
try {
fileWriter.flush();
fileWriter.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");
for (int i = 0; i < args.length; ++i) {
System.out.println("arg " + i + ":" + args[i]);
}
String topName = args[0];
String runMode = args[1];
String logFilePath = args[2];
String readRedisConfig = args[3];
String localOutputPath = args[4];
Config conf = new Config();
conf.put("logFile", logFilePath);
conf.put("redisCfg", readRedisConfig);
conf.put("localOutputPath", localOutputPath);
conf.setDebug(true);
System.out.println("Before Start...");
//if (args != null && args.length > 0) {
if (runMode.equalsIgnoreCase("srv")) { // 集群运行模式
System.out.println("Server Mode...");
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
else if (runMode.equalsIgnoreCase("loc")) { // 单机运行模式
System.out.println("Local Mode...");
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
} else {
System.out.println("Unknown Mode:" + runMode);
}
}
}
将上述代码,进行maven打包后,得到jar包:
storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar
本地模式运行命令:
storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology topName loc /root/StormTests/logs.txt redis_path /root/StormTests/out.log
集群模式运行命令:
storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology topName srv /root/StormTests/logs.txt redis_path /root/StormTests/out.log
现在的问题是:
本地模式运行成功,并得到输出文件;
但是集群模式运行后,得不到输出文件,也没有打印相关的调试信息(System.out.println(...))。
通过storm list命令,查看这些集群模式的job处于"ACTIVE"状态。
请教,该如何让它们正常的进行集群模式运行啊? --------------------编程问答-------------------- QTP --------------------编程问答-------------------- 什么意思啊?
补充:Java , Java EE