public static class Map extends Mapper<Object, Text, Text, Text> { private Text outKey = new Text(); private Text outValue = new Text(); private List<String> cacheList = new ArrayList<String>(); @Override protected void setup(Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException { super.setup(context); Path smallTablePath = new Path(context.getConfiguration().get("smallTableLocation")); FileSystem hdfs = smallTablePath.getFileSystem(context.getConfiguration()); FSDataInputStream hdfsReader = hdfs.open(smallTablePath); Text line = new Text(); LineReader lineReader = new LineReader(hdfsReader); while(lineReader.readLine(line) > 0) { String[] values = line.toString().split("\n"); for (int i = 0; i < values.length; i++) { cacheList.add(values[i]); } } lineReader.close(); hdfsReader.close(); System.out.println("setup ok"); } @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String row_matrix1 =value.toString().split(" ")[0]; String[] column_value_array_matrix1 =value.toString().split(" ")[1].split(","); for(String line:cacheList) { String row_matrix2 = line.toString().split("\t")[0]; String[] column_value_array_matrix2 = line.toString().split("\t")[1].split(","); int result= 0; for(String column_value_matrix1:column_value_array_matrix1) { String column_matrix1 = column_value_matrix1.split("_")[0]; String value_matrix1 = column_value_matrix1.split("_")[1]; for(String column_value_matrix2:column_value_array_matrix2) { if(column_value_matrix2.startsWith(column_matrix1+"_")) { String value_matrix2 = column_value_matrix2.split("_")[1]; result +=Integer.valueOf(value_matrix1)*Integer.valueOf(value_matrix2); } } } outKey.set(row_matrix1); outValue.set(row_matrix2+"_"+result); System.out.println(outValue); context.write(outKey, outValue); } } }
reduce不执行(个人观点) :1,主方法中job 设置的reduce类对不对 , 2,<1>中正确再在确认map输出格式与reduce输入格式是否一致,最后reduce加日志打印看走到哪步(setup,reduce, clearup),再按报错信息分析。希望有 帮助