当前位置:编程学习 > JAVA >>

mapreduce实现"浏览该商品的人大多数还浏览了"经典应用

输入:
日期    ...cookie id.        ...商品id..
xx            xx                        xx
输出:
商品id         商品id列表(按优先级排序,用逗号分隔)
xx                   xx
比如:
id1              id3,id0,id4,id2
id2             id0,id5
整个计算过程分为4步
1、提取原始日志日期,cookie id,商品id信息,按天计算,最后输出数据格式
商品id-0 商品id-1
xx           x x         
这一步做了次优化,商品id-0一定比商品id-1小,为了减少存储,在最后汇总数据转置下即可
reduce做局部排序及排重
 
2、基于上次的结果做汇总,按天计算
商品id-0 商品id-1  关联值(关联值即同时访问这两个商品的用户数)
xx             x x                xx
 
3、汇总最近三个月数据,同时考虑时间衰减,时间越久关联值的贡献越低,最后输出两两商品的关联值(包括转置后)
 
4、行列转换,生成最后要的推荐结果数据,按关联值排序生成
 
第一个MR
[java]  
import java.io.IOException;  
import java.util.ArrayList;  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.WritableComparable;  
import org.apache.hadoop.io.WritableComparator;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Partitioner;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.GenericOptionsParser;  
import org.apache.log4j.Logger;  
  
  
/* 
 * 输入:原始数据,会有重复 
 *日期 cookie 楼盘id 
 *  
 * 输出: 
 * 日期 楼盘id1 楼盘id2  //楼盘id1一定小于楼盘id2 ,按日期 cookie进行分组 
 *  
 */  
  
public class HouseMergeAndSplit {  
      
    public static class Partitioner1 extends Partitioner<TextPair, Text> {  
          @Override  
          public int getPartition(TextPair key, Text value, int numParititon) {  
                      return Math.abs((new Text(key.getFirst().toString()+key.getSecond().toString())).hashCode() * 127) % numParititon;  
  
          }  
    }  
          public static class Comp1 extends WritableComparator {  
              public Comp1() {  
               super(TextPair.class, true);  
              }  
              @SuppressWarnings("unchecked")  
              public int compare(WritableComparable a, WritableComparable b) {  
               TextPair t1 = (TextPair) a;  
               TextPair t2 = (TextPair) b;  
               int comp= t1.getFirst().compareTo(t2.getFirst());  
               if (comp!=0)  
                   return comp;  
               return t1.getSecond().compareTo(t2.getSecond());  
              }  
            }  
      public static class TokenizerMapper   
           extends Mapper<LongWritable, Text, TextPair, Text>{  
                  Text val=new Text("test");  
        public void map(LongWritable key, Text value, Context context  
                        ) throws IOException, InterruptedException {  
                         String s[]=value.toString().split("\001");              
             TextPair tp=new TextPair(s[0],s[1],s[4]+s[3]); //thedate cookie city+houseid  
             context.write(tp, val);  
        }  
      }  
        
      public static class IntSumReducer   
           extends Reducer<TextPair,Text,Text,Text> {  
          private static String comparedColumn[] = new String[3];  
          ArrayList<String> houselist= new ArrayList<String>();  
          private static Text keyv = new Text();  
            
          private static Text valuev = new Text();  
          static Logger logger = Logger.getLogger(HouseMergeAndSplit.class.getName());  
            
        public void reduce(TextPair key, Iterable<Text> values,   
                           Context context  
                           ) throws IOException, InterruptedException {  
              
            houselist.clear();  
            String thedate=key.getFirst().toString();  
            String cookie=key.getSecond().toString();    
             
            for (int i=0;i<3;i++)  
                comparedColumn[i]="";  
      &n
补充:软件开发 , Java ,
CopyRight © 2012 站长网 编程知识问答 www.zzzyk.com All Rights Reserved
部份技术文章来自网络,