手记

【大数据算法】Top N 算法的Hadoop实现

正文之前

感觉自己有一个世纪没写过文章了似的。不管了,今天看数据算法,那就拿这个开刀

另外,小生不才体真的超棒啊 !!

小生不才,
冒昧地喜欢了姑娘这么久,
甚是打扰,望多包涵,
今日愿以山上一草一木为誓,
今日你我二人就此别过,
若有重逢,我必待你眉眼如初,岁月如故,
若姑娘与有缘人终成眷属,
小生独羡其幸,
愿他宠你入骨,惜你如命,
好圆小生半事幽梦,
事已至此,缘尽,人散去了罢……

正文

算法梗概:

import scale.Tuple2;import java.util.List;import java.util.TreeMap;import java.util.SortedMap;import <your-package>.T; //此处是指你用到的自己定义的数据结构类型static SortedMap<Integer, T> topN(List<Tuple<T,Integer>> L, int N){    if((L == NULL) ||(L.isEmpty())){        return null;
    }
    SortedMap<Integer, T> topN = new TreeMap<Integer, T>();    for(Tuple2<T,Integer> element : L){        // element._1 's type is T;
        // element._2 's type is Integer;
        topN.put(element._2,element._1);        // only leave top N;
        if(topN.size() > N){
            topN.remove(topN.firstKey());
        }
    }    return topN;
}

这就是TopN算法的精华所在,对一个排序Map进行依次输入,当该Map的size等于我们需要的N时,再进来一个元素,那么就必须把整合该元素后的Map最小值删除掉。也就是上面的firstKey。

我昨天肝了一天的程序,结果最后卡在一个小BUG身上,真是要死要活。。。去他么的!!!!书里面也没说这个地方,要不是我瞎鸡儿机灵,还真的没法解决这个BUG。。他么原理压根看不懂好嚒!!

这是我整个的文件架构

// hadoopClear.java//import java.io.IOException;////import org.apache.hadoop.conf.Configuration;//import org.apache.hadoop.fs.Path;//import org.apache.hadoop.io.IntWritable;//import org.apache.hadoop.io.Text;//import org.apache.hadoop.mapreduce.Job;//import org.apache.hadoop.mapreduce.Mapper;//import org.apache.hadoop.mapreduce.Reducer;//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//import org.apache.hadoop.util.GenericOptionsParser;////public class hadoopClear {//    //map将输入中的value复制到输出数据的key上,并直接输出//    public static class Map extends Mapper<Object,Text,Text,Text>{//        //每行数据//        private static Text line=new Text();//        //实现map函数//        public void map(Object key,Text value,Context context) throws IOException,InterruptedException{//            String line = value.toString();//            String[] values = line.split(" ");//            line = "";//            for(int i=0; i<values.length -1 ;++i)//            {//                line += values[i];//            }//            context.write(new Text(line), new Text(""));//        }//    }//    //reduce将输入中的key复制到输出数据的key上,并直接输出//    public static class Reduce extends Reducer<Text,Text,Text,Text>{//        //实现reduce函数//        public void reduce(Text key,Iterable<Text> values,Context context)//                throws IOException,InterruptedException{//            context.write(key, new Text(""));//        }//    }////    public static void main(String[] args) throws Exception{//        Configuration conf = new Configuration();//        //这句话很关键//        conf.set("mapred.job.tracker", "node61:9001");//        String[] ioArgs=new String[]{"dedup_in","dedup_out"};//        String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();//        if (otherArgs.length != 2) {//            System.err.println("Usage: Data Deduplication <in> <out>");//            System.exit(2);//        }//        Job job = new Job(conf, "Data Deduplication");//        job.setJarByClass(hadoopClear.class);//        //设置Map、Combine和Reduce处理类//        job.setMapperClass(Map.class);//        job.setCombinerClass(Reduce.class);//        job.setReducerClass(Reduce.class);//        //设置输出类型//        job.setOutputKeyClass(Text.class);//        job.setOutputValueClass(Text.class);//        //设置输入和输出目录//        FileInputFormat.addInputPath(job, new Path(args[0]));//        FileOutputFormat.setOutputPath(job, new Path(args[1]));//        System.exit(job.waitForCompletion(true) ? 0 : 1);//    }//}////import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/**
 * Created by  ZZB on 2018/6/10.
 */public class hadoopClear {    public static void main(String[] args)throws Exception{        //创建配置对象
        Configuration conf = new Configuration();        //创建job对象
        Job job = Job.getInstance(conf,"hadoopClear");        //设置运行job的类
        job.setJarByClass(hadoopClear.class);        //设置mapper 类
        job.setMapperClass(ZZB_Mapper.class);        //设置reduce 类
        job.setReducerClass(ZZB_Reducer.class);        //设置map输出的key value
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);        //设置reduce 输出的 key value
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);        //设置输入输出的路径
        FileInputFormat.setInputPaths(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));        //提交job
        boolean b = job.waitForCompletion(true);        if(!b){
            System.out.println("wordcount task fail!");
        }
    }
}

第二个Java代码:

//ZZB_Mapper.javaimport org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.util.TreeMap;import java.util.SortedMap;import java.io.IOException;/**
 * Created by  ZZB on 2018/6/10.
 */public class ZZB_Mapper extends Mapper<LongWritable, Text, IntWritable,Text>{    private SortedMap<Double, Text> top10cats = new TreeMap<>();    private int N = 10;    @Override
    protected void setup(Context context) throws IOException, InterruptedException {        super.setup(context);
    }    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{        //得到输入的每一行数据  
        String line = value.toString();        //通过空格分隔  
        String[] values = line.split(",");
        Double weight = Double.parseDouble(values[0]);
        Text x = new Text(line);
        top10cats.put(weight,x);        if (top10cats.size()>N){
            top10cats.remove(top10cats.firstKey());
        }
    }    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {        int s = 4;        for (Text catAttr : top10cats.values()){
            s++;
            context.write(new IntWritable(s),catAttr);
        }
    }
}

第三个Java代码:

//ZZB_Reducer.javaimport org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.util.*;/**
 * Created by  ZZB on 2018/6/10.
 */public class ZZB_Reducer extends Reducer<IntWritable, Text,IntWritable, Text> {    private int N = 10;    private int s = 0;    private SortedMap<Double, Text> finaltop10 = new TreeMap<>();    protected void reduce(IntWritable key, Text catRecord, Context context) throws IOException, InterruptedException {

            String cat = catRecord.toString();
            String[] tokens = cat.split(",");
            finaltop10.put(1.0, catRecord);            if (finaltop10.size() > N) {
                finaltop10.remove(finaltop10.firstKey());
            }        for (Text text : finaltop10.values()){
            s++;
            context.write(new IntWritable(s),text);
        }
    }
}

其中的一个Bug就是,如下图所示的这样,如果我不给一个X做新的序列化字符串载体,那么直接put原本的value进入的话,就会显示是空字符串,报错是empty String。。

我他么也很绝望啊!!!为毛啊!!难道splits方法会要干掉原本的对象中的这个内容吗???我日哦!!

不过不管如何,最后我反正是成功了!!很欣慰!!

下面是过程:

[zbzhang@node61 ~]$ ./test.sh18/08/17 16:31:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted /output/result/_SUCCESS
Deleted /output/result/part-r-0000018/08/17 16:31:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable18/08/17 16:31:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable18/08/17 16:31:56 INFO client.RMProxy: Connecting to ResourceManager at node61/11.11.0.61:803218/08/17 16:31:56 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/08/17 16:31:56 INFO input.FileInputFormat: Total input files to process : 1
18/08/17 16:31:57 INFO mapreduce.JobSubmitter: number of splits:1
18/08/17 16:31:57 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled18/08/17 16:31:57 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1534317717839_003218/08/17 16:31:57 INFO impl.YarnClientImpl: Submitted application application_1534317717839_003218/08/17 16:31:57 INFO mapreduce.Job: The url to track the job: http://node61:8088/proxy/application_1534317717839_0032/
18/08/17 16:31:57 INFO mapreduce.Job: Running job: job_1534317717839_003218/08/17 16:32:05 INFO mapreduce.Job: Job job_1534317717839_0032 running in uber mode : false18/08/17 16:32:05 INFO mapreduce.Job:  map 0% reduce 0%
18/08/17 16:32:10 INFO mapreduce.Job:  map 100% reduce 0%
18/08/17 16:32:15 INFO mapreduce.Job:  map 100% reduce 100%
18/08/17 16:32:15 INFO mapreduce.Job: Job job_1534317717839_0032 completed successfully18/08/17 16:32:15 INFO mapreduce.Job: Counters: 49        File System Counters
                FILE: Number of bytes read=226
                FILE: Number of bytes written=395195
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=1669
                HDFS: Number of bytes written=185
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=2784
                Total time spent by all reduces in occupied slots (ms)=3008
                Total time spent by all map tasks (ms)=2784
                Total time spent by all reduce tasks (ms)=3008
                Total vcore-milliseconds taken by all map tasks=2784
                Total vcore-milliseconds taken by all reduce tasks=3008
                Total megabyte-milliseconds taken by all map tasks=2850816
                Total megabyte-milliseconds taken by all reduce tasks=3080192
        Map-Reduce Framework
                Map input records=100
                Map output records=10
                Map output bytes=200
                Map output materialized bytes=226
                Input split bytes=97
                Combine input records=0
                Combine output records=0
                Reduce input groups=10
                Reduce shuffle bytes=226
                Reduce input records=10
                Reduce output records=10
                Spilled Records=20
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=120
                CPU time spent (ms)=1760
                Physical memory (bytes) snapshot=505204736
                Virtual memory (bytes) snapshot=5761363968
                Total committed heap usage (bytes)=343932928
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=1572
        File Output Format Counters
                Bytes Written=18518/08/17 16:32:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

下面是结果:

5       919,cat17,cat176       926,cat21,cat217       947,cat61,cat618       950,cat10,cat109       952,cat13,cat1310      958,cat52,cat5211      976,cat92,cat9212      977,cat83,cat8313      987,cat23,cat2314      993,cat39,cat39

可见还是很靠谱的。。。我下面把我的测试脚本以及测试数据都贴上来给大家伙瞧一瞧哈!!

[zbzhang@node61 ~]$ cat test.sh
hdfs dfs -rm /output/result/*
hdfs dfs -rmdir /output/result
hadoop jar hadoopClear.jar hadoopClear /input/cat.txt /output/result
hdfs dfs -cat /output/result/*
rm result.txt
hdfs dfs -get /output/result/part* result.txt

下面是一百只猫的数据,大概就是根据这一百只猫的随机体重进行排序。在数据量小的情况下当然可以直接读取,但是如果是过亿条数据??我估计你的电脑直接会卡死。但是Hadoop就不会了对吧?!!肯定不会的撒!

说的我都想试试了!!



作者:HustWolf
链接:https://www.jianshu.com/p/4e46c6a45076


1人推荐
随时随地看视频
慕课网APP