一个产品或系统,会产生很多数据,这些数据主要分为两类:
1、业务数据:如用户信息、订单、流水等,这类数据会被存到数据库中,也是最为核心的数据。
2、日志数据:日志是应用在运行过程中源源不断产生的,用户的请求,方法的调用等,都会产生日志,日志以文件的形式存在磁盘上。
尽管日志数据看上去没有业务数据那么重要,但是也别小瞧了它,在遇到问题的时候,通过日志来排查问题是很有效的,除此之外,利用日志还能做许多有趣的事情。
日志能干啥
用户行为分析
用户的点击、登录、浏览、购买等所有操作,都会以日志的形式记录下来,通过user_id或者session_id串联起来分析,可以分析用户在当时的状态下,作出相应操作的原因,并根据这些数据,来改善产品
系统功能监测
一次http请求,都会有相应的返回信息;一次API的调用,也会有相应返回结果,通过日志将这些信息记录下来,就能检测系统功能是否出现异常,比如注册成功率、投资成功率等指标的监测统计
非业务数据获取
像用户来源渠道,用户的设备,操作系统,APP版本,浏览器版本等非业务数据,都可以通过日志获取到
实时报表展现
一般的报表,都是通过从数据库写查询来展现的,如果要展现实时数据,比如实时注册人数、实时投资金额的展示,包括更加炫酷的投资地图等报表,还是得靠日志
产生日志
日志的产生,无非就是将需要记录到日志中的信息,在代码中合适的地方,用像slf4j,log4j这样的日志工具打出来。通过实现Servlet的拦截器,如spring中的HandlerInterceptor,就可以不侵入业务代码实现对每一次请求的日志记录,HandlerInterceptor接口如下:
我们可以在preHandle方法中,从request对象中获取到本次请求的url,ip,client信息,headers,cookies,http请求参数,登录用户user_id,session_id等信息,封装到一个对象,将这个对象放在request作用域中(Servlet的作用域:请求、会话和上下文作用域);然后,在afterCompletion方法中,我们取出之前保存在请求作用域中的对象,记录下请求耗时,返回结果等信息,最后把这个对象打到日志中就可以了,这样就实现了一个“埋点”的逻辑。
收集日志
日志源源不断地写入文件,尽管我们可以通过一些命令(如grep)来查看文件中的日志,但是想要进一步处理和分析,日志必须进入我们的数据仓库(DW)中,有很多开源的日志收集工具,Logstash就是其中一种。
Logstash是一款开源的实时数据收集引擎,它的插件机制让它能轻松应对各种不同的输入源和输出汇,常用的input插件包括file,jdbc,kafka,redis,rabbitmq等,常用的输出插件包括:csv,elasticsearch,file,http,kafka,redis,mongodb等,filter插件可以过滤输入的数据,包括drop(直接丢掉),grok(把非结构化的事件数据解析成各种字段),下面是一个简单的logstash配置实例:
比如请求日志:
55.3.244.1 GET /index.html 15824 0.043
配置文件:
input {
file {
path => "/var/log/http.log"
}
}
filter {
grok {
match => {
"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
}
}
}
通过上面的解析,可以得到如下结果:
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043
一般来说,我们需要收集不止一台server上的日志,那么多个不同的logstash实例收集的日志应该输出到什么地方呢?
我们将所有这些logstash的output都输出到同一redis(集群),在redis中缓存,然后再通过一个总的logstash实例,从redis中取日志(redis input),通过http(s)传输到我们的数据中心。
接收日志
日志通过网络写往数据中心,数据中心有一个logstash实例负责接收日志(http input),过滤掉不需要的日志类型,将日志写入kafka。
Apache Kafka是一个高吞吐的分布式消息系统,最初由LinkedIn开发,并于2011年初开源,它的优点包括:
快速:支持每秒数百兆的读写
可扩展:支持分区,无需停机,集群方式水平扩展
持久化:数据持久化到磁盘,支持副本存储
相关术语:
Topic(主题):按类区分的消息,这些类就称为主题
Producer(生产者):向主题发布消息的进程
Consumer(消费者):订阅主题并消费消息的进程
Broker:一个kafka集群由一台或多台server组成,每台就叫做broker
对于每个topic,kafka维护多个分区(partition),如图所示:
每个分区是一个有序的、不可变的消息序列,日志被追加在分区后面。在每个分区的每条消息都有一个有序的id,称为偏移(offset),可以唯一确定每条消息在分区中的位置。分区分布在集群的多台机器上,每个分区都有备份,有一台作为该分区的leader,0或多台作为follower。
不管消息有没有被消费,kafka集群会储存发布的消息一段时间,这段时间可以配置。每个消费者只需要保存它消费到日志里的哪一个位置了,一般消费者都是按照顺序一条条消费,但是如果需要,也可以重新设置它开始读的位置。有关kafka更多的介绍可以阅读官方文档。
Logstash的kafka output plugin可以很容易把logstash收到的日志发布到kafka的某个topic里,如下配置:
output {
kafka {
bootstrap_servers => “localhost:9092”
topic_id => “logs”
}
}
指定kafka(zookeeper)的host地址和端口,再指定写入的topic就行了
进入数据仓库和实时计算
当日志进到kafka了之后,我们分两路消费日志,也对应了大数据处理的两种粒度:
进数据仓库(批处理)
日志会进入到数据仓库(Hive,HBase等),以便能和数据仓库中的业务数据(通过同步任务)关联,并通过批处理任务(MapReduce、SQL)计算需要的数据。这一路我们采用了Apache的Flume,flume是水槽的意思,Flume可以通过配置的方式,从许多不同的地方收集、聚合和移动数据到某个地方(HDFS、Hive、HBase等)
如图,Flume Agent是一个承载多个Flume组件的JVM进程,包括Source、Sink、Channel,Source和Sink顾名思义,Channel是Flume数据的临时驻留管道,常见的有memory管道和file管道,若是memory管道,数据缓存在内存中,因此速度快,但是当进程意外退出或者断电时,会造成数据的丢失,file管道中,数据保存在磁盘上,因此数据不会丢失,但是速度会慢一些,我们采用的是file channel,实际上memory channel问题也不大,因为极少会出现意外。
Flume中数据从Source到Channel以及从Channel到Sink的操作是两个独立的事务,而且为了提高效率,数据通常是多条一个批次操作的(可设置的Batch size,是个trade-off),这样就导致当处理到一半的时候如果Sink不可用了,会导致此次操作失败,下次Flume会重新执行这次操作,因此会造成数据的重复,即”at least once”语义,所以通常还需要定时运行某个去重的任务,比如用MapReduce或者Hive实现,但实际应用中常常对于重复也不是那么在意。
实时计算
Kafka还有一路,可以接入一些流式计算框架,如Storm,Samza,Spark等,实时计算的数据可以落入数据库,以便报表或API获取数据,也可以进入Redis这种内存数据库,实时展现应用获取Redis中的数据并通过WebSocket等技术”Push”给前端页面实现无刷新实时报表的展现。
总结
下图是一个较为完整的架构:
其中,中间部分的两个Logstash,是为了解决跨网络的传输,如果在同一个数据中心,保留一个就可以了。
最终的效果就是,应用产生的日志,可以在几秒钟之内近实时地(NRT)进入我们的数据仓库,然后就可以用SQL来查询日志了。
其实,多亏了开源世界,这个图中的几乎每一部分,都可以用其他不止一种开源框架和工具代替,那为什么我们这么选择?我觉得合适的就是最好的,根据自己的实际数据量,性能要求来选择相应的方案就可以了,这其中肯定会遇到各种问题,试着去解决,并随着数据量的增长作出相应改进和反思。
本文作者:顾寒阳coldcutter(点融黑帮),来自点融Data组。本科毕业于复旦大学计算机系,参加过ACM、百度之星等程序设计比赛,对算法与数据结构有浓厚兴趣,目前兴趣包括Web开发和大数据应用,爱玩桌球、羽毛球、网球、乒乓球、滑雪等运动。
作者:点融黑帮
链接:https://www.jianshu.com/p/68ef57af8571