手记

编写MapReduce程序

MapReduce阶段将整个运行过程分为两个阶段,Map阶段和Reduce阶段。

Map阶段由一定数量的Map Task组成
输入数据格式解析:InputFormat
输入的数据处理  :Mapper
输入数据分组    :Partitioner
数据的拷贝与按key排序
数据处理        :Reducer
数据的输出格式  :outputFormat

JAVA

import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCount {     public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {            private final static IntWritable one = new IntWritable(1);            private Text word = new Text();            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {            private IntWritable result = new IntWritable();            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context
            ) throws IOException, InterruptedException {                int sum = 0;                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key,result);
            }
        }        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(WordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path("input/"));
            FileOutputFormat.setOutputPath(job, new Path("output/"));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

}

C++

mapper

#include <iostream>#include <string>using namespace std;int main() {  string key;  while(cin >> key) {    cout << key << "\t" << "1" << endl;
  }  return 0;
}

reducer

//reduce前是已经排序后的数据#include <iostream>#include <string>using namespace std;int main() {  string cur_key, last_key, value;  cin >> cur_key >> value;
  last_key = cur_key;  int n = 1;  while(cin >> cur_key) {    cin >> value;    if(last_key != cur_key) {      cout << last_key << "\t" << n << endl;
      last_key = cur_key;
      n = 1;
    } else {
      n++;
    }
  }  cout << last_key << "\t" << n << endl;  return 0;
}

shell

mapper

#! /bin/bashwhile read LINE; do
  for word in $LINE
  do
    echo "$word 1"
  donedone

reducer

#! /bin/bashcount=0
started=0
word=""while read LINE;do
  newword=`echo $LINE | cut -d ' '  -f 1`  if [ "$word" != "$newword" ];then
    [ $started -ne 0 ] && echo "$word\t$count"
    word=$newword
    count=1
    started=1  else
    count=$(( $count + 1 ))  fidoneecho "$word\t$count"



作者:张晓天a
链接:https://www.jianshu.com/p/a75ef8b6e7db


0人推荐
随时随地看视频
慕课网APP