背景
饿了么对时序数据库的需求主要来自各监控系统,主要用于存储监控指标。原来使用graphite,后来慢慢有对指标有多维的需求(主要体现在对一个指标加多个Tag, 来组成Series,然后对Tag进行Filter和Group进行计算),这时graphite基本很难满足需求。
业界现在用的比较多的主要有如下几类TSDB:
InfluxDB:很多公司都在用,包括饿了么有部分监控系统也是用InfluxDB。优点,支持多维和多字段,存储也根据TSDB的特点做了优化。但开源的部分不支持,很多公司自己做集群化, 但大多基于指标名来,这样会有单指的热点问题。现在饿了么也是类似的做法,但热点问题很严重,大的指标已经用了最好的服务器,但是查询性能还是不够理想, 如果做成按Series Sharding那成本还是有一点高;
Graphite:根据指标写入及查询,计算函数很多,但很难支持多维,包括机房或多集群的查询,原来饿了么把业务层的监控指标存储在Graphite中,并工作的很好, 但是多活之后基本已经很难满足一些需求了,由于其存储结构的特点,很占IO,根据目前线上的数据写放大差不多几十倍以上;
OpenTSDB: 基于HBase,优点存储层不用自己考虑,做好查询聚合就可以,也会存在HBase的热点问题等,在以前公司也弄基于HBase实现的TSDB,来解决OpenTSDB的一些问题, 如热点,部分查询聚合下放到HBase等,目的是优化其查询性能,但依赖HBase/HDFS还是很点重;
HiTSDB: 阿里提供的TSDB,存储也是用HBase,在数据结构及Index上面做了很多优化,具体没有研究,有兴趣的同学可以在阿里云上试一下;
Druid: Druid其实是一个OLAP系统,但也可以用来存储时间序列数据,但看到它的架构图时已经放弃了;
ES: 也有公司直接用ES来存储,没有实际测试,但总觉得ES不是一个真正的TSDB;
atlas: Netflix出品,全内存TSDB,最近几小时数据全在内存中,历史数据需要外部存储,具体没有详细研究;
beringei:facebook出品,全内存TSDB,跟atlas一样最近的数据在内存,目前应该还在孵化期;
3. 最终我们还是决定自己实现一套分布式时序数据库,具体需要解决如下问题:
整体设计
采用计算和存储分离的架构,分为计算层LinProxy和存储层LinStorage。
说明:
写入
整个写过程分为如下2部分组成:
WAL复制,这部分设计上参考了Kafka,用户的写入只要写入WAL成功,就认为成功(由于主要用于监控系统,所以对数据的一致性没有做太多的保证),这样就可以提供系统的写入吞吐;
本地写入,这个过程是把WAL的数据解析写入到自己的存储结构中,只有写入本地存储的数据才可以查到;
整个过程不像一些系统在每次写的过程中完成,我们是把这个过程分2步,并异步化了;
WAL复制
目前LinDB的replica复制协议采用多通道复制协议,主要基于WAL在多节点之间的复制,WAL在每个节点上的写入,有独立的写操作完成, 所以对于Client写入对应Leader的WAL成功就认为本次写操作是成功的,Leader所在的节点负责把相应的WAL复制到对应的follower, 同理写WAL成功认为复制成功,如下所示:
多通道复制协议
写入Leader副本成功就算成功以提高了写入速率,也带来了以下问题:
数据一致性的问题
数据的丢失问题
以上图Server1为Leader,3个Replication来复制1-WAL为举例来说:
当前Server1是该shard的Leader接受Client的写入,Server2和Server3都是Follower接受Server1的复制请求,此时1-wal通道作为当前的数据写入通道, Server2和Server3此时可能落后于Server1。
说明:
觉得有收获的话可以点个关注收藏转发一波哦!想要了解和学习更多知识进 java 架构交流群:725633148 里面有阿里Java高级大牛直播讲解知识点,分享知识,多年工作经验的梳理和总结 带着大家全面科学地建立自己的技术体系和技术认知!更有很多视频与干货分享!
Leader Replica Index > Follower Append Index,这时需要根据Follower Append Index重置Leader Replica Index,可能存在2种情况,具体情况在复制顺序性中描述;
Leader Replica Index < Follower Append Index,也同样存在2种情况,具体情况在复制顺序性中描述;
假如此时Server1挂了,从Server2和Server3中选出新的Leader,如此时选为Server2为Leader。
Server2就会开启2-wal复制通道,向server1和server3复制,由于当前server1挂了,所以暂时只往Server3复制,此时数据的写入通道为2-wal。
Server1启动恢复后,Server2会开启向Server1的2-wal复制通道,同时server1会将1-wal中剩余的还未向Server2和Server3复制的数据复制给他们。
对于异常情况,WAL中的数据不能正常由于ACK之后删除,导致WAL占用过多磁盘,所以对WAL需要有一个SIZE和TTL的清理过程,一旦因为WAL因为SIZE和TTL清理之后,会导致几个Index错乱,具体错乱情况如上所述。
多通道复制协议带来的问题:
每个通道都有对应的index序列,保存每个通道的last index。而单通道复制只需要保存1个last index即可。这个代价其实还好。
本地写入
背景
做到Shard级别的写入隔离,即每个Shard都会有独立的线程来负责写入,不会因为某个数据库或者某个Shard写入量具增而导致别的数据库的写入, 但可能会因为单机承载的Shard数过多,导致线程数过多,如果遇到这种情况,应该通过扩机器来解决,或者在新建数据库的时候,合理分配Shard数。
由于是单线程的写操作,所以在很多情况下,不需要考虑多线程写带来的锁竞争问题。
数据存储结构
说明,以单个数据库在单节点上的数据结构如例:
一个数据库在单节点上会存在多个Shard,所有Shard共享一个索引数据;
所有的数据根据数据库的Interval来计算按时间片来存储具体的数据包括数据文件和索引文件。
这样的设计主要为了方便处理TTL,数据如果过期,直接删除相应的目录就可以;
每个shard下面会存在segment,segment根据interval来存储相应时间片的数据;
为什么每个segment下面又按interval存储很多个data family?这个主要由于LinDB主要解决的问题是存储海量的监控数据,一般的监控数据基本是最新时间写入, 基本不会写历史数据,而整个LinDB的数据存储类似LSM方式,所以为了减少数据文件之间的合并操作,导致写放大,所以最终衡量下来,再对segment时间片进行分片。
下面以interval为10s为例说明:
segment按天来存储;
每个segment按小时来分data family,每个小时一个family,每个family中的文件再按列存储具体的数据。
写入流程
说明:
查询引擎
LinDB查询需要解决如下问题:
解决多个机房之间的查询;
高效的流式查询计算;
说明:
Node查询
每个Node接收LinConnect过来的请求,在内部查询计算成中间结果返回给LinConnect,详细的过程后面要介绍;
说明:
如果所示,Client过来的一个查询请求,会产生很多小的查询任务,每个任务所承担的职责很单一,只做它所自己的任务,然后把结果给下一个任务, 所以需要所有的查询计算任务都是异常无阻塞处理,IO/CPU任务分离;
整个服务端查询使用Actor模式来简化整个Pipeline的处理;
任何一个任务执行完成,如果没有结果产生,则不会生产下游的任务,所有下游的任务都是根据上游任务是否有结果来决定;
最终把底层结果,通过Reduce Aggregate聚合成最终的结果;
存储结构
倒排索引
倒排索引,分两部分,目前索引相关的数据还是存储在RocksDB中。
根据Time Series的Measurement+Tags生成对应的唯一ID(类似luence里面的doc id)。
根据Tags倒排索引,指向一个ID列表。TSID列表以BitMap的方式存储,以方便查询的时候通过BitMap操作来过滤出想要的数据。BitMap使用RoaringBitMap。
每一类数据都存储在独立的RocksDB Family中。
内存结构
文件存储结构
文件存储跟内存存储类似,同一个Measurement的数据以Block的方式存储在一起,查询时通过Measurement ID定位到该Measurement的数据存储在哪个Block中。
Measurement Block后存储一个Offset Block,即存储每个Measurement Block所在的Offset,每个Offset以4 bytes存储。
Offset Block存储一个Measurement Index Block,按顺序存储每个Measurement ID,以Bitmap的方式存储。
文件的尾存储一个Footer Block,主要存储Version(2 bytes) + Measurement Index Offset(4 bytes) + Measurement Index Length(4 bytes)。
Data数据块都是数值,所以使用xor压缩,参考facebook的gorilla论文;
Measurement Block:
每个Measurement Block类似Measurement的方式存储,只是把Measurement ID换成Measurement内的TSID。
TS Entry存储该TSID对应每一列的数据,一列数据对应存储一段时间的数据点。
查询逻辑:
DataFile在第一次加载的时候会把Measurement Index放在内存中,查询输入Measurement ID通过Measurement Index中的第几个位置,然后通过这个位置N,在Offset Block查询具体的Measurement Block的Offset,由于每个Offset都是4 bytes,所以offset position = (N-1) * 4,再读取4 bytes得到真正的Offset。
同样的道理可以通过TSID,找到具体的TS Entry,再根据条件过滤具体的列数据,最终得到需要读取的数据。
总结
LinDB从2年前正式慢慢服务于公司的监控系统,从1.0到2.0,已经稳定运行2年多,除了一次rocksdb的问题,几乎没出过什么问题,到现在的3.0性能的大幅提升,我们基本都是站在业界一些成熟方案的基础上,慢慢演进而来。
也有人问,LinDB为什么这么快,其实我们是参考了很多TSDB的作法,然后取其好的设计,再结果时序的特征再做一些优化。
6. 还有一点就是LinDB在新建数据库时指定完Interval之后,系统会自己Rollup,不像InfluxDB要写很多Continue Query,LinDB所有的这一切都是自动化的;
7.查询计算并行流式处理;
所以用一句话来总结的话就是一个高效的索引外加一堆数值,然后怎么玩好这堆数值。
自身监控
LinDB也自带了自身的一些监控功能
Overview
Dashboard
未来的展望
丰富查询函数;
优化内存使用率;
自身监控的提升;
如果有可能,计划开源;
对比测试
下面是与InfluxDB和LinDB2.0的一些查询性能对比。 由于InfluxDB集群化要商业版,所以都是单机默认配置下,无Cache的测试。 服务器配置阿里云机器:8 Core 16G Memory
大维度
Tags: host(40000),disk(4),partition(20),模拟服务器磁盘的监控,总的Series数为320W,每个Series写一个数据点
小维度的1天内的聚合测试
Tags: host(400),disk(2),partition(10),模拟服务器磁盘的监控,总的Series数为8K,每个Series写一天的数据 每个维度每2s写入1个点,每个维度一天内总共43200个点,所有维度总共43200 * 8000个点,共3 4560 0000即3亿多数据
小维度的7天内的聚合测试
Tags: host(400),disk(2),partition(10),模拟服务器磁盘的监控,总的Series数为8K,每个Series写7天的数据 每个维度每5s写入1个点,每个维度一天内总共17280个点,所有天数所有维度总共17280 8000 7 个点,即9 6768 0000,9亿多个点 这个测试要说明一下,得利于LinDB自动的Rollup,如果InfluxDB开Continue Query的话相信应该也还好。