mahout关联规则之FPGrowthDriver源码分析part1
首先说明一点,前面的文章中的mahout关联规则源码分析part2 很多地方都理解错误了,现重新把理解的写下:
在命令行直接运行下面的命令就可以获得mahout关联规则FPGrowthDriver的用法:
[java]
bin/hadoop jar $mahout_home/core/target/mahout-core-0.7-job.jar org.apache.mahout.fpm.pfpgrowth.FPGrowthDriver -h
1. 打开FPGrowthDriver的源文件,可以看到主要的操作是调用PFPGrowth的runPFPGrowth方法,因为这里只考虑并行的情况,所以使用-method 参数使用mapreduce变量:
[java]
else if ("mapreduce".equalsIgnoreCase(classificationMethod)) {
Configuration conf = new Configuration();
HadoopUtil.delete(conf, outputDir);
PFPGrowth.runPFPGrowth(params);
}
这样操作就转移了,即 FPGrowthDriver--> PFPGrowth
2. 打开PFPGrowth源文件,查看runPFPGrowth方法,可以看到该类主要有下面四个操作:
[java]
2.1 startParallelCounting(params, conf);
2.2// save feature list to dcache
List<Pair<String,Long>> fList = readFList(params);
saveFList(fList, params, conf);
2.3 startParallelFPGrowth(params, conf);
2.4 startAggregating(params, conf);
其中2.1和2.2的操作在mahout关联规则源码分析 Part 1里面已经说明的很清楚了,这里不再说明;
2.3 这一步开启了一个Job,他的Mapper、Combiner、Reducer分别是:ParallelFPGrowthMapper、ParallelFPGrowthCombiner、ParallelFPGrowthReducer;
[java]
job.setMapperClass(ParallelFPGrowthMapper.class);
job.setCombinerClass(ParallelFPGrowthCombiner.class);
job.setReducerClass(ParallelFPGrowthReducer.class);
在说明具体步骤之前,先贴上原始数据,设置FPGrowthDriver的-g参数为2,即为2组:
表1
[html]
牛奶,鸡蛋,面包,薯片
鸡蛋,爆米花,薯片,啤酒
鸡蛋,面包,薯片
牛奶,鸡蛋,面包,爆米花,薯片,啤酒
牛奶,面包,啤酒
鸡蛋,面包,啤酒
牛奶,面包,薯片
牛奶,鸡蛋,面包,黄油,薯片
牛奶,鸡蛋,黄油,薯片
牛奶1,鸡蛋1,面包1,薯片1
鸡蛋1,爆米花1,薯片1,啤酒1
鸡蛋1,面包1,薯片1
牛奶1,鸡蛋1,面包1,爆米花1,薯片1,啤酒1
牛奶1,面包1,啤酒1
鸡蛋1,面包1,啤酒1
牛奶1,面包1,薯片1
牛奶1,鸡蛋1,面包1,黄油1,薯片1
牛奶1,鸡蛋1,黄油1,薯片1
2.3.1 ParallelFPGrowthMapper的主要操作:
2.3.1.1 ParallelFPGrowthMapper的setup函数,此函数主要是读取全局的fList(参考:mahout关联规则之FP树:Parallel FP-Growth for Query Recommendation),把其存入一个Map中:
[java]
int i = 0;
for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) {
fMap.put(e.getFirst(), i++);
}
针对原始数据,那么存储后的fMap为(项目名,编码,出现的次数),其中编码是根据项目出现的次数按降序从0开始向上编码,每次递增1:
表 2
[html]
薯片 0 7
薯片1 1 7
面包 2 7
面包1 3 7
鸡蛋 4 7
鸡蛋1 5 7
牛奶 6 6
牛奶1 7 6
啤酒 8 4
啤酒1 9 4
2.3.1.2 ParallelFPGrowthMapper的map函数:这个函数主要有两部分:第一部分:针对原始数据的一个事务,按照fMap中出现的顺序进行输出,并删除没有在fMap中出现的项目:比如针对
[html]
鸡蛋,面包,薯片
输出应为:[0,2,4];针对
[html]
牛奶1,鸡蛋1,面包1,爆米花1,薯片1,啤酒1
输出应为:[1,3,5,7,9];www.zzzyk.com
第二部分:针对上面的输出如何进行map的输出呢?上面设置的numGroups为2,即为两组(numGroups的参数设置主要是针对fList的,即把fList分为多组,这样可以达到并行的目的),那么0~4(编码后的项目名)为第一组,其相应的id为0,5~9为第二组,相应的id为1。若第一部分的所有项目都没有超过第一组的最后一个编码(本例中即为4)则此记录只输出一条记录,即本身;比如[0,2,4],那么map输出的记录的key为组的id,即0,value是[0,2,4];否则输出两条记录比如[1,3,5,7,9],其中一条为本身,即map输出key为id,1,value为[1,3,5,7,9];另一条记录为key为0,value为[1,3],即把第一部分的输出拆分为两部分,只取相应组的输出即可。又比如[0,2,4,6]的输出应为: 0 [0,2,4]; 1 [0,2,4,6] ;
那么针对原始数据map的全部输出为:
表3
这里输出TransactionTree,其中的构造方法,其中transactionSet为TransactionTree的一个属性(这里初始TransactionTree,下面详细介绍):
[java]
public TransactionTree(IntArrayList items, Long support) {
representedAsList = true;
transactionSet = Lists.newArrayList();
transactionSet.add(new Pair<IntArrayList,Long>(items, support));
}
2.3.2 ParallelFPGrowthCombiner的reduce函数:
[java]
TransactionTree cTree =new TransactionTree();
[java] view plaincopy
for (TransactionTree tr : values) {
for (Pair<IntArrayList,Long> p : tr) {
cTree.addPattern(p.getFirst(), p.getSecond());
}
}
context.write(key, cTree.getCompressedTree());
这里可以看到新建了一个TransactionTree,然后把同一组(groupid)的记录通过addPattern方法放入到一棵TransactionTree上,最后通过getCompressedTree方法返回一个压缩了的TransactionTree并输出此TransactionTree;
TransactionTree的属性有:
[java]
int[] attribute;
int[] childCount;
int[][] nodeChildren;
long[] nodeCount;
int nodes;
boolean representedAsList;
List<Pair<IntArrayList,Long>> transactioniSet;
比如[0,2,4,6],[0,2,4],[2,4,8]下面三条记录通过addPattern加入数中的效果如下:
第一、二条记录:
第三条记录:
通过addPattern方法加入记录,TransactionTree的representedAsList属性为false,transactionSet为null,其他属性则存储相应的值;
通过上面的方法即可以把每个id
补充:软件开发 , Java ,