数据指标统计作业
业务描述
!数据指标统计
//对金币数量,总观看pv,粉丝数量,视频开播时长 等指标统计
//自定义数据类型 一个记录管理四个字段
//主播id为key,map节点<k2,v2>为<Text,自定义Writable>
//自定义数据类型
public class VideoInfoWritable implements Writable{
private long gold;
private long watchnumpv;
private long follower;
public void set(long gold, long watchnumpv, long follower){
this.gold= gold;
this.watchnumpv= watchnumpv;
this.follower= follower;
}
public long getGold(){
return gold;
}
@Override
public void readFields(DataInput dataInput){
this.gold= dataInput.readLong();
this.watchnumpv= dataInput.readLong();
this.follower= dataInput.readLong();
}
//读写数据顺序保持一致!
@Override
public void write(DataOutput dataOutput){
dataOutput.writeLog(gold);
dataOutput.writeLog(watchnumpv);
dataOutput.writeLog(follower);
}
//generate添加
//作为v3需要改下字段结构
@Override
public String toString(){
return gold+"\t"+watchnumpv+"\t"+follower;
}
}
public class VideoInfoMap extend Mapper<LongWritable,Text,Text,VideoInfoWritable>{
@Override
protected void map(LongWritable k1, Text v1, Context context){
String line = v1.toString();
//用之前清洗后的数据
String[] fields = line.split("\t");
String id = fields[0];
long gold = Long.parseLong(fields[1]);
long watchnumpv= Long.parseLong(fields[2]);
long follower = Long.parseLong(fields[3]);
//组装k2,v2
Text k2 = new Text();
k2.set(id);
VideoInfoWritable v2 = new VideoInfoWritable();
v2.set(gold, watchnumpv, follower);
Context.write(k2, v2);
}
}
public class VideoInfoReduce extends Reducer<Text, VideoInfoWritable, Text, VideoWritable>{
@Override
protected void reduce(Text k2, Iterable<VideoInfoWritable> v2s, Context context){
//从v2s把相同key的value取出, 求和
long goldsum=0;
long watchnumpvsum=0;
long followersum=0;
for( VideoInfoWritable v2: v2s){
goldsum+= v2.getGold();
watchnumpvsum += v2.getWatchnumpv();
followersum += v2.getFollower();
}
//组装 k3, v3 进行聚合
//Text k3 = k2;
VideoInfoWritable v3 = new VideoInfoWritable();
v3.set(goldsum, watchnumpvsum, followersum);
context.write(k3, v3);
}
}
public class VideoInfoJob{
//执行任务job
//组装map reduce
public static void main(String[] args){
try{
if(args.length!=2){
}
Configuration conf = new Configuration;
Job job= job.getInstance(conf);
job.setJarByClass(VideoInfoJob.class);
//文件输入输出
FileInputFormat
FileOutputFormat
//map
job.setMapperClass
//k2类型
job.setMapOutputKeyClass
//v2类型
job.setMapOutpiyValueClass
//reduce
job.setReducerClass
//k3
job.setReducerClass
//
}
}
}
自定义Writable代码实现
1、由于原始数据中涉及到多个需要统计的字段,所以可以把这几个字段统一记录在一个自定义数据类型中,方便使用。
2、代码实现
数据统计代码实现
1、对数据中的金币数量,总观看pv,粉丝数量,视频总开播时长等指标进行统计
2、统计每天开播时长最长的前10名主播及对应的开播时长
3、代码实现