看第一个代码:
package Inverse;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InverseIndex {
private static class IndexMapper extends Mapper<Object, Text, Text, Text>{
private Text word_filepath = new Text();//文件路径
private Text one = new Text("1");//个数
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String str = value.toString().replaceAll("[^0-9a-zA-Z]", " ");
String[] ss = str.split("\\s+");
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
System.out.println("InverseIndex.IndexMapper.map()"+value.toString()+" "+fileName);
for(int i=0;i<ss.length;i++){
word_filepath.set(ss[i]+"###"+fileName);
context.write(word_filepath, one);
}
}
}
private static class IndexCombiner extends Reducer<Text, Text, Text, Text>{
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
String[] str = key.toString().split("###");
int sum = 0;
for (Iterator iterator = value.iterator(); iterator.hasNext();) {
Text val = (Text) iterator.next();
sum +=Integer.parseInt(val.toString());
}
context.write(new Text(str[0]), new Text(str[1]+"###"+sum));
}
}
public static class Mypartitioner extends Partitioner<Text, Text>{
@Override
public int getPartition(Text key, Text value, int numPartitions) {
// if(key.toString().)
return 0;
}
}
public static class IndexReducer extends Reducer<Text, Text, Text, Text>{
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (Iterator iterator = value.iterator(); iterator.hasNext();) {
Text val = (Text) iterator.next();
String str = val.toString();
sb.append(str);
if(iterator.hasNext()){
sb.append(";");
}
}
context.write(key, new Text(sb.toString()));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new String[]{"/testinverse/","/inverseout"};
conf.set("mapred.job.tracker", "172.24.132.190:9001");
Job job = new Job(conf, "word count");
System.out.println(job.getJar());
job.setJarByClass(InverseIndex.class);
job.setMapperClass(IndexMapper.class);
job.setCombinerClass(IndexCombiner.class);//优化
job.setReducerClass(IndexReducer.class);
// job.setNumReduceTasks(2);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
Path temp = new Path(otherArgs[1]);
if (fs.exists(temp)) {
fs.delete(temp, true);
}
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
上面这个代码可以处理单机节点的,加入是多台机器执行mapper函数,那么就会出现问题。所以下面是处理机器环境的倒排序索引:
package Inverse;
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/**
* 倒排索引
* Map阶段输出格式key=bjsxt###t1.html value=1
* 采用Partitioner将key相同的用同一个reduce处理(只得到key的#前边相同的用同一个reducec处理)
* @author Dingzhiwei
*
*/
public class InvertedIndex2 {
private static Text oldkey = null;
private static Vector<Text> vector = new Vector<Text>();
public static class I
补充:综合编程 , 其他综合 ,