当前位置:编程学习 > JAVA >>

Hadoop 类Grep源代码注释

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.tdxx.hadoop.example;

import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


/**
 * 从input中提取与表达式相符的单词并计算词频
 * 
 * 继承自配置基类Configured,并扩展接口Tool
 * Configured类中有一个变量conf用于存储配置文件
 * Tool中只有一个方法需要实现
 * int run(String [] args)用于运行输入参数
 * hadoop jar Grep.jar /user/hadoop/20130704/grep.txt /user/hadoop/output/ 'aaa.*'
 * hadoop jar Grep.jar /user/hadoop/20130704/grep.txt /user/hadoop/output/ '[a-z.]+'
 */
public class Grep extends Configured implements Tool {
	// singleton
	private Grep() {
	} 

	public int run(String[] args) throws Exception {
		if (args.length < 3) {
			System.out.println("Grep <inDir> <outDir> <regex> [<group>]");
			ToolRunner.printGenericCommandUsage(System.out);
			return -1;
		}

		Path tempDir = new Path("grep-temp-"
				+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

		//创建job
		JobConf grepJob = new JobConf(getConf(), Grep.class);

		try {
			//job命名
			grepJob.setJobName("grep-search");

			//设置job的输入路径
			FileInputFormat.setInputPaths(grepJob, args[0]);
			
			//设置Mapper类
			grepJob.setMapperClass(RegexMapper.class);
			
			grepJob.set("mapred.mapper.regex", args[2]);
			if (args.length == 4)
				grepJob.set("mapred.mapper.regex.group", args[3]);

			//设置Combiner类
			grepJob.setCombinerClass(LongSumReducer.class);
			
			//设置Reducer类
			grepJob.setReducerClass(LongSumReducer.class);

			//设置输出路径
			FileOutputFormat.setOutputPath(grepJob, tempDir);
			
			//设置输出格式
			grepJob.setOutputFormat(SequenceFileOutputFormat.class);
			
			//设置输出键的类
			grepJob.setOutputKeyClass(Text.class);
			
			//设置输出值的类
			grepJob.setOutputValueClass(LongWritable.class);

			//运行
			JobClient.runJob(grepJob);

			JobConf sortJob = new JobConf(getConf(), Grep.class);
			sortJob.setJobName("grep-sort");

			FileInputFormat.setInputPaths(sortJob, tempDir);
			sortJob.setInputFormat(SequenceFileInputFormat.class);

			sortJob.setMapperClass(InverseMapper.class);

			sortJob.setNumReduceTasks(1); // write a single file
			FileOutputFormat.setOutputPath(sortJob, new Path(args[1]));
			sortJob.setOutputKeyComparatorClass // sort by decreasing freq
					(LongWritable.DecreasingComparator.class);

			JobClient.runJob(sortJob);
		} finally {
			FileSystem.get(grepJob).delete(tempDir, true);
		}
		return 0;
	}

	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new Grep(), args);
		System.exit(res);
	}

}

 

补充:软件开发 , Java ,
CopyRight © 2022 站长资源库 编程知识问答 zzzyk.com All Rights Reserved
部分文章来自网络,