当前位置:编程学习 > 网站相关 >>

mapreduce倒排序索引

看第一个代码:
 
 
 
package Inverse;
 
import java.io.IOException;
import java.util.Iterator;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
public class InverseIndex {
 
 private static class IndexMapper extends Mapper<Object, Text, Text, Text>{
 
  private Text word_filepath = new Text();//文件路径
  
  private  Text one  = new Text("1");//个数
  @Override
  protected void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   String str = value.toString().replaceAll("[^0-9a-zA-Z]", " ");
   String[] ss = str.split("\\s+");
   
   FileSplit fileSplit = (FileSplit) context.getInputSplit();
   String fileName = fileSplit.getPath().getName();
   
   System.out.println("InverseIndex.IndexMapper.map()"+value.toString()+"  "+fileName);
   
   for(int i=0;i<ss.length;i++){
    word_filepath.set(ss[i]+"###"+fileName);
    context.write(word_filepath, one);
   }
  }
  
 }
 
 private static class IndexCombiner extends Reducer<Text, Text, Text, Text>{
 
  protected void reduce(Text key, Iterable<Text> value,Context context)
    throws IOException, InterruptedException {
   String[] str = key.toString().split("###");
   int sum = 0;
   
   for (Iterator iterator = value.iterator(); iterator.hasNext();) {
    Text val = (Text) iterator.next();
    sum +=Integer.parseInt(val.toString());
   }
   context.write(new Text(str[0]), new Text(str[1]+"###"+sum));
  }
  
 }
 
 public static class Mypartitioner extends Partitioner<Text, Text>{
 
  @Override
  public int getPartition(Text key, Text value, int numPartitions) {
//   if(key.toString().)
   return 0;
  }
  
 }
 
 public static class IndexReducer extends Reducer<Text, Text, Text, Text>{
 
  protected void reduce(Text key, Iterable<Text> value,Context context)
    throws IOException, InterruptedException {
   StringBuffer sb = new  StringBuffer();
   for (Iterator iterator = value.iterator(); iterator.hasNext();) {
    Text val = (Text) iterator.next();
    String str = val.toString();
    
    
    
    sb.append(str);
    if(iterator.hasNext()){
     sb.append(";");
    }
   }
   context.write(key, new Text(sb.toString()));
  }
  
 }
 
 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  String[] otherArgs = new String[]{"/testinverse/","/inverseout"};
  conf.set("mapred.job.tracker", "172.24.132.190:9001");
  Job job = new Job(conf, "word count");
     System.out.println(job.getJar());
     
     job.setJarByClass(InverseIndex.class);
     
     job.setMapperClass(IndexMapper.class);
     job.setCombinerClass(IndexCombiner.class);//优化
     job.setReducerClass(IndexReducer.class);
//     job.setNumReduceTasks(2);
     
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
     
     FileSystem fs = FileSystem.get(conf);
     Path temp = new Path(otherArgs[1]);
     if (fs.exists(temp)) {
      fs.delete(temp, true);
   
  }
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
     
     System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}
 
 
 
上面这个代码可以处理单机节点的,加入是多台机器执行mapper函数,那么就会出现问题。所以下面是处理机器环境的倒排序索引:
 
package Inverse;
 
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Vector;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 
/**
 * 倒排索引
 * Map阶段输出格式key=bjsxt###t1.html value=1
 * 采用Partitioner将key相同的用同一个reduce处理(只得到key的#前边相同的用同一个reducec处理)
 * @author Dingzhiwei
 *
 */
public class InvertedIndex2 {
    
    private static Text oldkey = null;
    private static Vector<Text> vector = new Vector<Text>(); 
    
    public static class I
补充:综合编程 , 其他综合 ,
CopyRight © 2012 站长网 编程知识问答 www.zzzyk.com All Rights Reserved
部份技术文章来自网络,