hadoop 怎么定义分组函数?

hadoop 怎么定义分组函数


神不在的星期二
浏览 835回答 1
1回答

ABOUTYOU

1.二次排序概念:首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果 。如: 输入文件:20 2150 5150 5250 5350 5460 5160 5360 5260 5660 5770 5860 6170 5470 5570 5670 5770 581 23 45 67 82203 2150 51250 52250 53530 5440 51120 5320 52260 5660 57740 5863 61730 5471 5571 5673 5774 5812 21131 4250 627 8输出(需要分割线):------------------------------------------------1 2------------------------------------------------3 4------------------------------------------------5 6------------------------------------------------7 87 82------------------------------------------------12 211------------------------------------------------20 2120 5320 522------------------------------------------------31 42------------------------------------------------40 511------------------------------------------------50 5150 5250 5350 5350 5450 6250 51250 522------------------------------------------------60 5160 5260 5360 5660 5660 5760 5760 61------------------------------------------------63 61------------------------------------------------70 5470 5570 5670 5770 5870 58------------------------------------------------71 5571 56------------------------------------------------73 57------------------------------------------------74 58------------------------------------------------203 21------------------------------------------------530 54------------------------------------------------730 54------------------------------------------------740 582.工作原理使用如下map和reduce:(特别注意输入输出类型, 其中IntPair为自定义类型)public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable, IntWritable>在map阶段,使用job.setInputFormatClass(TextInputFormat)做为输入格式。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。在随后的例子中,第一个例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。3,具体步骤(1)自定义key在mr中,所有的key是需要被比较和排序的,并且是二次,先根据partitione,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,他有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的。并重载方法:12345678910//反序列化,从流中的二进制转换成IntPairpublic void readFields(DataInput in) throws IOException//序列化,将IntPair转化成使用流传送的二进制public void write(DataOutput out)//key的比较public int compareTo(IntPair o)//另外新定义的类应该重写的两个方法//The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)public int hashCode()public boolean equals(Object right)(2)由于key是自定义的,所以还需要自定义一下类:(2.1)分区函数类。这是key的第一次比较。1public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>在job中使用setPartitionerClasss设置Partitioner。(2.2)key比较函数类。这是key的第二次比较。这是一个比较器,需要继承WritableComparator(也就是实现RawComprator接口)。(这个就是前面说的第二种方法,但是在第三部分的代码中并没有实现此函数,而是直接使用compareTo方法进行比较,所以也就不许下面一行的设置)在job中使用setSortComparatorClass设置key比较函数类。1public static class KeyComparator extends WritableComparator2.3)分组函数类。在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator。1public static class GroupingComparator extends WritableComparator分组函数类也必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)分组函数类的另一种方法是实现接口RawComparator。在job中使用setGroupingComparatorClass设置分组函数类。另外注意的是,如果reduce的输入与输出不是同一种类型,则不要定义Combiner也使用reduce,因为Combiner的输出是reduce的输入。除非重新定义一个Combiner。
打开App,查看更多内容
随时随地看视频慕课网APP