手记

[Hadoop踩坑]MapReduce的一个小实验(wordcount)

正文

大数据下,要解决的两大问题:数据量大,一台机器存不下?怎么办?把数据分开存,存到多台机器里,分布式存储。这么多数据,计算速度慢?怎么办?没事,一台机器计算慢,那就多台机器协同起来算。这就是分布式计算。

Hadoop是什么?

Hadoop是一个开源分布式计算平台,为用户提供了系统底层细节透明的分布式基础架构。Hadoop的核心是分布式文件系统(Hadoop Distrubuted File System ,HDFS)和MapReduce。HDFS就是我前面说的分布式存储,MapReduce是分布式计算的一个模型。

HDFS是什么?

HDFS是分布式文件系统,能对数据进行分布式存储。一个大的文件,会被分成一个一个数据块,数据块大小是默认的。而且一个数据块一般会复制成3份,存到不同的机器上,每个数据块都是如此,由此数据就分布到各个机器上。实现了 文件的分布式存储。一个数据块的复制为多份,也称冗余存储,用来解决数据传输出错的问题。

MapReduce是什么?

由前面的介绍,MapReduce起到了分布式计算的作用。它是怎么运作的?可能这是我们关心的,而且下面我们要用它来搞事情啊。所以这里详细的讲讲MapReduce的运行
MapReduce分为map和reduce,一个map处理一个数据块,所以每个机器上会有多个map,用来处理存储在这个机器上的多个数据块,处理的结果形成(key,value)键值对的形式。map处理后的结果由reduce汇总,最后将最终结果进行输出。可能有点抽象,举个简单的例子,计算文本中单词出现的个数(wordcount)

MapReduce的工作模式


首先最左边是一个文件,分成3个数据块(当然这个文件就几个字,太小了,我这里只是做一下演示说明),每个map对应一个数据块,对它进行处理,这里就将每个单词出现次数先置为1。处理之后,就是shuffle(洗牌),sort(排序)。所谓的洗牌就是将key值(这里是单词)相同的放到一起,排序就是按照key进行排序,如四个键值就是按照(h,i,l,y)顺序排好的。可以看到从上到下,him排在前面,you在最后面……reduce就是将key值出现的次数进行汇总,把value值进行相加。这个结果就是这个单词的次数。最后再将总的结果进行输出到文件中。

这就是整个MapReduce的工作模式,讲的还详细不?
下面就动手编程来实现下。

Wordcount程序详解:
package org.apache.hadoop.examples;
 
import java.io.IOException;
import java.util.StringTokenizer;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class WordCount { 
   
    public static class TokenizerMapper 
 
    extends Mapper<Object, Text, Text, IntWritable> { 
/**
 * map函数的输入键、输入值、输出键和输出值  Hadoop本身提供了一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。这些类型都在org.apache.hadoop.io包中
 * Text(相当于java的string类型) IntWritable(相当于java中的Integer)
 */
        private final static IntWritable one = new IntWritable(1); 
        private Text word = new Text(); 
        public void map(Object key, Text value, Context context)  //content用于输出内容的写入
        //map()方法的输入是一个键和一个值。我们首先将包含一行输入的Text值转换成java中的string类型
        throws IOException, InterruptedException { 
            StringTokenizer itr = new StringTokenizer(value.toString()); 
            //这是一个分割字符串的类,java中默认的分隔符是:"空格","\t"制表符,"\n"换行符,"\r"回车符
            while (itr.hasMoreTokens()) {  //判断是否还有分隔符
                word.set(itr.nextToken());  //下一个字符串转换为Text类型               
                //String nextToken():返回从当前位置到下一个分隔符的字符串。
                context.write(word, one);  
            } 
        } 
    } 
 
    public static class IntSumReducer 
  //同样的,reduce函数也有四个形式参数类型用于指定输入和输出类型.reduce函数的输入类型必须匹配map函数的输出类型:即Text类型和Intwritable
    //在这种情况下,reduce的输出也是Text和Intwritable
    extends Reducer<Text, IntWritable, Text, IntWritable> { 
 
        private IntWritable result = new IntWritable(); 
        public void reduce(Text key, Iterable<IntWritable> values, 
                Context context) 
        throws IOException, InterruptedException { 
            int sum = 0; 
            for (IntWritable val : values) { 
                sum += val.get(); 
            } 
            result.set(sum); 
            context.write(key, result); 
        } 
    } 
 
    public static void main(String[] args) throws Exception { 
        Configuration conf = new Configuration(); 
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
 
        if (otherArgs.length != 2) { 
         System.out.println(otherArgs.length);
            System.err.println("Usage: wordcount <in> <out>"); 
            System.exit(2);
        } 
        //Job对象指定作业执行规范.用它控制整个作业的运行.在集群上运行这个作业时,要把代码打包成一个JAR文件
        //(Hadoop在集群上发布这个文件).不必明确指定JAR文件的名称.在Job对象的setJarByClass()方法中传递一个类即可
        Job job = new Job(conf, "word count"); 
        job.setJarByClass(WordCount.class); 
        job.setMapperClass(TokenizerMapper.class);  //指定要用的map类型
        job.setCombinerClass(IntSumReducer.class); 
        job.setReducerClass(IntSumReducer.class);  //指定要用的reduce类型
        job.setOutputKeyClass(Text.class);  //控制reduce函数的输出类型
        job.setOutputValueClass(IntWritable.class); 
        FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
        //定义输入数据的路径,可以是单个文件,也可以是一个目录(此时,将目录下所有文件当做输入)
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
        //定义输出路径,指定reduce函数输出文件的写入目录.在运行作业前该目录是不应该存在的,否则Hadoop会报错并拒绝运行作业.
        System.exit(job.waitForCompletion(true) ? 0 : 1); 
        //waitForCompletion()方法提交作业并等待执行完成.该方法唯一的参数是一个标识,指示是否已生成详细输出.当标识为true(成功)时,作业会把其进度信息写到控制台
    } 
}

结果



作者:枫雪浅语
链接:https://www.jianshu.com/p/ed8141511b8b

0人推荐
随时随地看视频
慕课网APP