hadoop 自定义可写不产生预期的输出

我有一组来自映射器的减速器输入:


(1939, [121, 79, 83, 28]) 

(1980, [0, 211, −113])

我想得到如下输出:


1939 max:121 min:28 avg: 77.75

如果我在我的 reducer 类中不使用如下自定义可写,我可以得到它:


public static class MaxTemperatureReducer

      extends Reducer<Text, IntWritable, Text, Text> {

          Text yearlyValue = new Text();

      @Override

      public void reduce(Text key, Iterable<IntWritable> values,

          Context context)

          throws IOException, InterruptedException {

            int sum = 0;

            int CounterForAvg = 0;

            int minValue = Integer.MAX_VALUE;

            int maxValue = Integer.MIN_VALUE;

            float avg;

            for (IntWritable val : values) {

                int currentValue = val.get();

                sum += currentValue;

                CounterForAvg++;

                minValue = Math.min(minValue, currentValue);

                maxValue = Math.max(maxValue, currentValue);

            }

            avg = sum / CounterForAvg;

            String requiredValue = "max temp:"+maxValue + "\t" +"avg temp: "+ avg + "\t"+ "min temp: " +minValue;

            yearlyValue.set(requiredValue);

            context.write(key, yearlyValue);

      }

    }

但是,使用 customwritable 类会产生以下结果:


1939 121

1939 79

1939 83

1939 28

1980 0

1980 211

1980 -113

这是我如何实现自定义类和减速器。我将可迭代对象发送到自定义类并在那里执行计算。我无法弄清楚我在这里做错了什么。我在 Java 中有 0 exp。


繁花不似锦
浏览 140回答 1
1回答

慕容3067478

合并的调用不应该帮助我连接值吗当然可以,但你没有正确使用它。out从未初始化。&nbsp; CompositeWritable out; // null here&nbsp; Text textYearlyValue = new Text();&nbsp; public void reduce(Text key, Iterable<IntWritable> values,&nbsp; &nbsp; &nbsp; Context context)&nbsp; &nbsp; &nbsp; throws IOException, InterruptedException {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;out.merge(values); // still null, should throw an exception如果你想输出一行字符串,你可以只使用一个Text对象。你的merge(Iterable<IntWritable> values)方法可以去任何地方,它不必在一个完全独立的类中来返回一个 Writable 对象。但无论如何,如果练习是为了学习如何实现自定义可写,那么就可以了。注意事项:如果你想“组合”多个字段,那么你应该声明它们readFields并且write需要按照相同的顺序toString确定您在使用TextOutputFormat(默认)时在减速器输出中看到的内容equals并且hashCode为了完整性而添加(理想情况下你会实现WritableComparable,但这真的只对键重要,而不是那么多值)为了与其他 Writables 类似,我将您的merge方法重命名为set.你可以期待下面的输出看起来像1939&nbsp; &nbsp; MinMaxAvgWritable{min=28, max=121, avg=77.75}1980&nbsp; &nbsp; MinMaxAvgWritable{min=-113, max=211, avg=32.67}public class MinMaxAvgWritable implements Writable {&nbsp; &nbsp; private int min, max;&nbsp; &nbsp; private double avg;&nbsp; &nbsp; private DecimalFormat df = new DecimalFormat("#.00");&nbsp; &nbsp; @Override&nbsp; &nbsp; public String toString() {&nbsp; &nbsp; &nbsp; &nbsp; return "MinMaxAvgWritable{" +&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "min=" + min +&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ", max=" + max +&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ", avg=" + df.format(avg) +&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; '}';&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public boolean equals(Object o) {&nbsp; &nbsp; &nbsp; &nbsp; if (this == o) return true;&nbsp; &nbsp; &nbsp; &nbsp; if (o == null || getClass() != o.getClass()) return false;&nbsp; &nbsp; &nbsp; &nbsp; MinMaxAvgWritable that = (MinMaxAvgWritable) o;&nbsp; &nbsp; &nbsp; &nbsp; return min == that.min &&&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; max == that.max &&&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; avg == that.avg;&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public int hashCode() {&nbsp; &nbsp; &nbsp; &nbsp; return Objects.hash(min, max, avg);&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public void write(DataOutput dataOutput) throws IOException {&nbsp; &nbsp; &nbsp; &nbsp; dataOutput.writeInt(min);&nbsp; &nbsp; &nbsp; &nbsp; dataOutput.writeInt(max);&nbsp; &nbsp; &nbsp; &nbsp; dataOutput.writeDouble(avg);&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public void readFields(DataInput dataInput) throws IOException {&nbsp; &nbsp; &nbsp; &nbsp; this.min = dataInput.readInt();&nbsp; &nbsp; &nbsp; &nbsp; this.max = dataInput.readInt();&nbsp; &nbsp; &nbsp; &nbsp; this.avg = dataInput.readDouble();&nbsp; &nbsp; }&nbsp; &nbsp; public void set(int min, int max, double avg) {&nbsp; &nbsp; &nbsp; &nbsp; this.min = min;&nbsp; &nbsp; &nbsp; &nbsp; this.max = max;&nbsp; &nbsp; &nbsp; &nbsp; this.avg = avg;&nbsp; &nbsp; }&nbsp; &nbsp; public void set(Iterable<IntWritable> values) {&nbsp; &nbsp; &nbsp; &nbsp; this.min = Integer.MAX_VALUE;&nbsp; &nbsp; &nbsp; &nbsp; this.max = Integer.MIN_VALUE;&nbsp; &nbsp; &nbsp; &nbsp; int sum = 0;&nbsp; &nbsp; &nbsp; &nbsp; int count = 0;&nbsp; &nbsp; &nbsp; &nbsp; for (IntWritable iw : values) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; int i = iw.get();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (i < this.min) this.min = i;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (i > max) this.max = i;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; sum += i;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; count++;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; this.avg = count < 1 ? sum : (sum / (1.0*count));&nbsp; &nbsp; }}有了这个,reducer就很简单了public class CompositeReducer extends Reducer<Text, IntWritable, Text, MinMaxAvgWritable> {&nbsp; &nbsp; private final MinMaxAvgWritable output = new MinMaxAvgWritable();&nbsp; &nbsp; @Override&nbsp; &nbsp; protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {&nbsp; &nbsp; &nbsp; &nbsp; // This 'set/merge' method could just as easily be defined here, and return a String to be set on a Text object&nbsp; &nbsp; &nbsp; &nbsp; output.set(values);&nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; context.write(key, output);&nbsp; &nbsp; }}工作是这样设置的&nbsp; &nbsp; // outputs for mapper and reducer&nbsp; &nbsp; job.setOutputKeyClass(Text.class);&nbsp; &nbsp; // setup mapper&nbsp; &nbsp; job.setMapperClass(TokenizerMapper.class);&nbsp; // Replace with your mapper&nbsp; &nbsp; job.setMapOutputValueClass(IntWritable.class);&nbsp; &nbsp; // setup reducer&nbsp; &nbsp; job.setReducerClass(CompositeReducer.class);&nbsp; &nbsp; job.setOutputValueClass(MinMaxAvgWritable.class); // notice custom writable&nbsp; &nbsp; FileInputFormat.addInputPath(job, new Path(args[0]));&nbsp; &nbsp; FileOutputFormat.setOutputPath(job, new Path(args[1]));&nbsp; &nbsp; return job.waitForCompletion(true) ? 0 : 1;
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java