所谓的二次排序就是对文件中先对第一个字段排序,如果第一个字段相同,则根据第一个字段再对第二个字段进行排序(即先根据键排序,然后在根据相同的键对其值进行排序),先看最基础的效果图就明白了:
image.png
一、自定义一个实现 WritableComparable 接口的类型,用于对数据的排序:
public class SortWritable implements WritableComparable<SortWritable> { //分别代表第一个字段和第二个字段 private String first; private int second; public SortWritable() { } public SortWritable(String first, int second) { this.set(first,second); } //为方便调用我们创建一个方法 public void set(String first, int second){ this.first = first; this.second = second; } public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } //先根据第一个字段比较排序,如果相同在根据第二个比较排序 public int compareTo(SortWritable o) { int comp = this.getFirst().compareTo(o.getFirst()); if(0 == comp){ return Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond())); } return comp; } //序列化 public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(first); dataOutput.writeInt(second); } //反序列化 public void readFields(DataInput dataInput) throws IOException { this.first = dataInput.readUTF(); this.second = dataInput.readInt(); } //一下三个方法都是快捷生成 @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SortWritable that = (SortWritable) o; if(second != that.second) return false; return first != null ? first.equals(that.first) : that.first == null; } @Override public int hashCode() { int result = first != null ? first.hashCode() : 0; result = 31 * result + second; return result; } @Override public String toString() { return "SortWritable{" + "first='" + first + '\'' + ", second=" + second + '}'; } }
二、创建驱动类基础框架:
public class MySecondSortMR extends Configured implements Tool { public int run(String[] args) throws Exception { //驱动 //1) 获取 Configuration Configuration configuration = this.getConf(); //2) 创建 job Job job = Job.getInstance(configuration, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); //3.1) 输入 Path inputPath = new Path(args[0]); FileInputFormat.addInputPath(job, inputPath); //3.2 设置 map job.setMapperClass(SecondMapper.class); job.setMapOutputKeyClass(SortWritable.class); job.setMapOutputValueClass(IntWritable.class); //1.分区 //job.setPartitionerClass(FirstPartitioner.class); //2.压缩 //configuration.set("mapreduce.map.output.compress","true"); //configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); //3.分组 //job.setGroupingComparatorClass(FirstGrouping.class); //3.3 设置 reduce job.setReducerClass(SecondReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置 reduce 的任务个数 //job.setNumReduceTasks(2); //3.4 设置输出 Path outputPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputPath); //4.提交 boolean sucess = job.waitForCompletion(true); return sucess ? 0 : 1; } public static void main(String[] args) { //当打包成 jar 之前 记得注释掉 args = new String[]{ "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/secondsort.txt", "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/output9" }; Configuration configuration = new Configuration(); try { //先判断文件夹是否存在 Path fileOutPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if(fileSystem.exists(fileOutPath)){ fileSystem.delete(fileOutPath, true); //删除 } int status = ToolRunner.run(configuration, new MySecondSortMR(), args); System.exit(status); } catch (Exception e) { e.printStackTrace(); } } }
三、创建一个Mapper的子类,用于对数据的切分及逻辑的的操作(这里值得注意的是输出的键是我们自定义的类型SortWritable):
public static class SecondMapper extends Mapper<LongWritable, Text, SortWritable, IntWritable>{ private SortWritable outputKey = new SortWritable(); private IntWritable outputValue = new IntWritable(); @Override protected void setup(Context context) throws IOException, InterruptedException { } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] values = value.toString().split(" "); if(2 != values.length) return; outputKey.set(values[0], Integer.valueOf(values[1])); outputValue.set(Integer.valueOf(values[1])); context.write(outputKey, outputValue); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { } }
四、创建 Reducer 的子类(数据类型的输入要和mapper的输出类型要一致):
public static class SecondReduce extends Reducer<SortWritable, IntWritable, Text, IntWritable>{ private Text outputKey = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { } @Override protected void reduce(SortWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for(IntWritable value : values){ outputKey.set(key.getFirst()); context.write(outputKey, value); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { } }
五、运行程序如果没有问题的话即可直接在网页 /user/hdfs/output9 查看生成的数据,接下来我们使用命令查看排序的结果:
bin/hdfs dfs -text /user/hdfs/output9/part*
效果图如下:
image.png
六、为达到优化效果,我们可做如下设置:
image.png
由于键值的组合,为保原有的分区与分组原有的结构,我们需要去自定义分区与分组类。
七、自定义一个实现RawComparator接口的分组类:
public class FirstGrouping implements RawComparator<SortWritable> { //通过字节数组进行对比 public int compare(byte[] bytes1, int i, int i1, byte[] bytes2, int i2, int i3) { //int有四个字节,因此从 0 开始 到 i - 4 return WritableComparator.compareBytes(bytes1,0,i1 - 4,bytes2,0,i3-4); } //通过对象进行对比 public int compare(SortWritable o1, SortWritable o2) { return o1.getFirst().compareTo(o2.getFirst()); } }
八、自定义一个继承自Partitioner的分区类:
public class FirstPartitioner extends Partitioner<SortWritable, IntWritable> { public int getPartition(SortWritable key, IntWritable intWritable, int i) { return (key.getFirst().hashCode() & 2147483647) % i; } }
接下来我们我们再去掉驱动类的 run() 方法中的 分组和分区的注释语句再运行程序,同样得到我们所需要的效果。。。
image.png
感谢老师与各位大神的指点,感恩一切。。。
作者:小飞牛_666
链接:https://www.jianshu.com/p/7faa112f9af4