数据指标统计作业
业务描述
!数据指标统计
//对金币数量,总观看pv,粉丝数量,视频开播时长 等指标统计
//自定义数据类型 一个记录管理四个字段
//主播id为key,map节点<k2,v2>为<Text,自定义Writable> //自定义数据类型 public class VideoInfoWritable implements Writable{ private long gold; private long watchnumpv; private long follower; public void set(long gold, long watchnumpv, long follower){ this.gold= gold; this.watchnumpv= watchnumpv; this.follower= follower; } public long getGold(){ return gold; } @Override public void readFields(DataInput dataInput){ this.gold= dataInput.readLong(); this.watchnumpv= dataInput.readLong(); this.follower= dataInput.readLong(); } //读写数据顺序保持一致! @Override public void write(DataOutput dataOutput){ dataOutput.writeLog(gold); dataOutput.writeLog(watchnumpv); dataOutput.writeLog(follower); } //generate添加 //作为v3需要改下字段结构 @Override public String toString(){ return gold+"\t"+watchnumpv+"\t"+follower; } } public class VideoInfoMap extend Mapper<LongWritable,Text,Text,VideoInfoWritable>{ @Override protected void map(LongWritable k1, Text v1, Context context){ String line = v1.toString(); //用之前清洗后的数据 String[] fields = line.split("\t"); String id = fields[0]; long gold = Long.parseLong(fields[1]); long watchnumpv= Long.parseLong(fields[2]); long follower = Long.parseLong(fields[3]); //组装k2,v2 Text k2 = new Text(); k2.set(id); VideoInfoWritable v2 = new VideoInfoWritable(); v2.set(gold, watchnumpv, follower); Context.write(k2, v2); } } public class VideoInfoReduce extends Reducer<Text, VideoInfoWritable, Text, VideoWritable>{ @Override protected void reduce(Text k2, Iterable<VideoInfoWritable> v2s, Context context){ //从v2s把相同key的value取出, 求和 long goldsum=0; long watchnumpvsum=0; long followersum=0; for( VideoInfoWritable v2: v2s){ goldsum+= v2.getGold(); watchnumpvsum += v2.getWatchnumpv(); followersum += v2.getFollower(); } //组装 k3, v3 进行聚合 //Text k3 = k2; VideoInfoWritable v3 = new VideoInfoWritable(); v3.set(goldsum, watchnumpvsum, followersum); context.write(k3, v3); } } public class VideoInfoJob{ //执行任务job //组装map reduce public static void main(String[] args){ try{ if(args.length!=2){ } Configuration conf = new Configuration; Job job= job.getInstance(conf); job.setJarByClass(VideoInfoJob.class); //文件输入输出 FileInputFormat FileOutputFormat //map job.setMapperClass //k2类型 job.setMapOutputKeyClass //v2类型 job.setMapOutpiyValueClass //reduce job.setReducerClass //k3 job.setReducerClass // } } }
自定义Writable代码实现
1、由于原始数据中涉及到多个需要统计的字段,所以可以把这几个字段统一记录在一个自定义数据类型中,方便使用。
2、代码实现
数据统计代码实现
1、对数据中的金币数量,总观看pv,粉丝数量,视频总开播时长等指标进行统计
2、统计每天开播时长最长的前10名主播及对应的开播时长
3、代码实现