我在看一些优秀的开源库的时候看到一个有意思的缓存库 fastcache,在它的介绍主要有以下几点特点:
读写数据要快,即使在并发下;即使在数 GB 的缓存中,也要保持很好的性能,以及尽可能减少 GC 次数;设计尽可能简单;
本文会通过模仿它写一个简单的缓存库,从而研究其内核是如何实现这样的目标的。希望各位能有所收获。
在项目中,我们经常会用到 Go 缓存库比如说 patrickmn/go-cache库。但很多缓存库其实都是用一个简单的 Map 来存放数据,这些库在使用的时候,当并发低,数据量少的时候是没有问题的,但是在数据量比较大并发比较高的时候会延长 GC 时间,增加内存分配次数。
比如,我们使用一个简单的例子:
func main() {
a :=make(map[string]string, 1e9)
for i :=0; i < 10; i++ {
runtime.GC()
}
runtime.KeepAlive(a)
}
在这个例子中,预分配了大小是10亿(1e9) 的 map,然后我们通过 gctrace 输出一下 GC 情况:
做实验的环境是 Linux,机器配置是 16C 8G ,想要更深入理解 GC,可以看这篇:《 Go 语言 GC 实现原理及源码分析 luozhiyun/archives/475 》
[root@localhost gotest]# GODEBUG=gctrace=1 go run main.go
…
gc 6 @13.736s 17%: 0.010+1815+0.004 ms clock, 0.17+0/7254/21744+0.067 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced)
gc 7 @15.551s 18%: 0.012+1796+0.005 ms clock, 0.20+0/7184/21537+0.082 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced)
gc 8 @17.348s 19%: 0.008+1794+0.004 ms clock, 0.14+0/7176/21512+0.070 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced)
gc 9 @19.143s 19%: 0.010+1819+0.005 ms clock, 0.16+0/7275/21745+0.085 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced)
gc 10 @20.963s 19%: 0.011+1844+0.004 ms clock, 0.18+0/7373/22057+0.076 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced)
上面展示了最后 5 次 GC 的情况,下面我们看看具体的含义是什么:
gc 1 @0.004s 4%: 0.22+1.4+0.021 ms clock, 1.7+0.009/0.40/0.073+0.16 ms cpu, 4->5->1 MB, 5 MB goal, 8 P
gc 10 @20.963s 19%: 0.011+1844+0.004 ms clock, 0.18+0/7373/22057+0.076 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced)
gc 10 :程序启动以来第10次GC
@20.963s:距离程序启动到现在的时间
19%:当目前为止,GC 的标记工作所用的CPU时间占总CPU的百分比
垃圾回收的时间
0.011 ms:标记开始 STW 时间
1844 ms:并发标记时间
0.004 ms:标记终止 STW 时间
垃圾回收占用cpu时间
0.18 ms:标记开始 STW 时间
0 ms:mutator assists占用的时间
7373 ms:标记线程占用的时间
22057 ms:idle mark workers占用的时间
0.076 ms:标记终止 STW 时间
内存
73984 MB:标记开始前堆占用大小
73984 MB:标记结束后堆占用大小
73984 MB:标记完成后存活堆的大小
147968 MB goal:标记完成后正在使用的堆内存的目标大小
16 P:使用了多少处理器
可以从上面的输出看到每次 GC 处理的时间非常的长,占用的 CPU 资源也非常多。那么造成这样的原因是什么呢?
string 实际上底层数据结构是由两部分组成,其中包含指向字节数组的指针和数组的大小:
type StringHeader struct {
Data uintptr
Len int
}
由于 StringHeader中包含指针,所以每次 GC 的时候都会扫描每个指针,那么在这个巨大的 map中是包含了非常多的指针的,所以造成了巨大的资源消耗。
在上面的例子 map a 中数据大概是这样存储:
一个 map 中里面有多个 bucket ,bucket 里面有一个 bmap 数组用来存放数据,但是由于 key 和 value 都是 string 类型的,所以在 GC 的时候还需要根据 StringHeader中的 Data指针扫描 string 数据。
对于这种情况,如果所有的 string 字节都在一个单一内存片段中,我们就可以通过偏移来追踪某个字符串在这段内存中的开始和结束位置。通过追踪偏移,我们不在需要在我们大数组中存储指针,GC 也不在会被困扰。如下:
如同上面所示,如果我们将字符串中的字节数据拷贝到一个连续的字节数组 chunks 中,并为这个字节数组提前分配好内存,并且仅存储字符串在数组中的偏移而不是指针。
除了上面所说的本科证书内容以外,还有其他的方法吗?
其实我们还可以直接从系统 OS 中调用 mmap syscall 进行内存分配,这样 GC 就永远不会对这块内存进行内存管理,因此也就不会扫描到它。如下:
func main() {
test :=“hello syscall”
data, _ :=syscall.Mmap(-1, 0, 13, syscallT_READ|syscallT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
p :=(*[13]byte)(unsafe.Pointer(&data[0]))
for i :=0; i < 13; i++ {
p[i]=test[i]
}
fmt.Println(string(p[:]))
}
通过系统调用直接向 OS 申请了 13bytes 的内存,然后将一个字符串写入到申请的内存数组中。
所以我们也可以通过提前向 OS 申请一块内存,而不是用的时候才申请内存,减少频繁的内存分配从而达到提高性能的目的。
我们在开发前先把这个库的 API 定义一下:
func New(maxBytes int) *Cache
创建一个 Cache 结构体,传入预设的缓存大小,单位是字节。
func (c *Cache) Get(k []byte) []byte
获取 Cache 中的值,传入的参数是 byte 数组。
func (c *Cache) Set(k, v []byte)
设置键值对到缓存中,k 是键,v 是值,参数都是 byte 数组。
const bucketsCount=512
type Cache struct {
buckets [bucketsCount]bucket
}
type bucket struct {
// 读写锁
mu sync.RWMutex
// 二维数组,存放数据的地方,是一个环形链表
chunks [][]byte
// 索引字典
m map[uint64]uint64
// 索引值
idx uint64
// chunks 被重写的次数,用来校验环形链表中数据有效性
gen uint64
}
通过我们上面的分析,可以看到,实际上真正存放数据的地方是 chunks 二维数组,在实现上是通过 m 字段来映射索引路径,根据 chunks 和 gen 两个字段来构建一个环形链表,环形链表每转一圈 gen 就会加一。
func New(maxBytes int) *Cache {
if maxBytes <=0 {
panic(fmt.Errorf(“maxBytes must be greater than 0; got %d”, maxBytes))
}
var c Cache
// 算出每个桶的大小
maxBucketBytes :=uint64((maxBytes + bucketsCount - 1) / bucketsCount)
for i :=range c.buckets[:] {
// 对桶进行初始化
c.buckets[i].Init(maxBucketBytes)
}
return &c
}
我们会设置一个 New 函数来初始化我们 Cache 结构体,在 Cache 结构体中会将缓存的数据大小平均分配到每个桶中,然后对每个桶进行初始化。
const bucketSizeBits=40
const maxBucketSize uint64=1 << bucketSizeBits
const chunkSize=64 * 1024
func (b *bucket) Init(maxBytes uint64) {
if maxBytes==0 {
panic(fmt.Errorf(“maxBytes cannot be zero”))
}
// 我们这里限制每个桶最大的大小是 1024 GB
if maxBytes >=maxBucketSize {
panic(fmt.Errorf(“too big maxBytes=%d; should be smaller than %d”, maxBytes, maxBucketSize))
}
// 初始化 Chunks 中每个 Chunk 大小为 64 KB,计算 chunk 数量
maxChunks :=(maxBytes + chunkSize - 1) / chunkSize
b.chunks=make([][]byte, maxChunks)
b.m=make(map[uint64]uint64)
// 初始化 bucket 结构体
b.Reset()
}
在这里会将桶里面的内存按 chunk 进行分配,每个 chunk 占用内存约为 64 KB。在最后会调用 bucket 的 Reset 方法对 bucket 结构体进行初始化。
func (b *bucket) Reset() {
b.mu.Lock()
chunks :=b.chunks
// 遍历 chunks
for i :=range chunks {
// 将 chunk 中的内存归还到缓存中
putChunk(chunks[i])
chunks[i]=nil
}
// 删除索引字典中所有的数据
bm :=b.m
for k :=range bm {
delete(bm, k)
}
b.idx=0
b.gen=1
b.mu.Unlock()
}
Reset 方法十分简单,主要就是清空 chunks 数组、删除索引字典中所有的数据以及重置索引 idx 和 gen 的值。
在上面这个方法中有一个 putChunk ,其实这个就是直接操作我们提前向 OS 申请好的内存,相应的还有一个 getChunk 方法。下面我们具体看看 Chunk 的操作。
const chunksPerAlloc=1024
const chunkSize=64 * 1024
var (
freeChunks []*[chunkSize]byte
freeChunksLock sync.Mutex
)
func getChunk() []byte {
freeChunksLock.Lock()
if len(freeChunks)==0 {
// 分配 64 * 1024 * 1024=64 MB 内存
data, err :=syscall.Mmap(-1, 0, chunkSize*chunksPerAlloc, syscallT_READ|syscallT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
if err !=nil {
panic(fmt.Errorf(“cannot allocate %d bytes via mmap: %s”, chunkSize*chunksPerAlloc, err))
}
// 循环遍历 data 数据
for len(data) > 0 {
//将从系统分配的内存分为 64 * 1024=64 KB 大小,存放到 freeChunks中
p :=(*[chunkSize]byte)(unsafe.Pointer(&data[0]))
freeChunks=append(freeChunks, p)
data=data[chunkSize:]
}
}
//从 freeChunks 获取最后一个元素
n :=len(freeChunks) - 1
p :=freeChunks[n]
freeChunks[n]=nil
freeChunks=freeChunks[:n]
freeChunksLock.Unlock()
return p[:]
}
初次调用 getChunk 函数时会使用系统调用分配 64MB 的内存,然后循环将内存切成 1024 份,每份 64KB 放入到 freeChunks 空闲列表中。然后获取每次都获取 freeChunks 空闲列表最后一个元素 64KB 内存返回。需要注意的是 getChunk 会下下面将要介绍到的 Cache 的 set 方法中使用到,所以需要考虑到并发问题,所以在这里加了锁。
func putChunk(chunk []byte) {
if chunk==nil {
return
}
chunk=chunk[:chunkSize]
p :=(*[chunkSize]byte)(unsafe.Pointer(&chunk[0]))
freeChunksLock.Lock()
freeChunks=append(freeChunks, p)
freeChunksLock.Unlock()
}
putChunk 函数就是将内存数据还回到 freeChunks 空闲列表中,会在 bucket 的 Reset 方法中被调用。
const bucketsCount=512
func (c *Cache) Set(k, v []byte) {
h :=xxhash.Sum64(k)
idx :=h % bucketsCount
c.buckets[idx].Set(k, v, h)
}
Set 方法里面会根据 k 的值做一个 hash,然后取模映射到 buckets 桶中,这里用的 hash 库是 cespare/xxhash。
最主要的还是 buckets 里面的 Set 方法:
func (b *bucket) Set(k, v []byte, h uint64) {
// 限定 k v 大小不能超过 2bytes
if len(k) >=(1<<16) || len(v) >=(1<<16) {
return
}
// 4个byte 设置每条数据的数据头
var kvLenBuf [4]byte
kvLenBuf[0]=byte(uint16(len(k)) >> 8)
kvLenBuf[1]=byte(len(k))
kvLenBuf[2]=byte(uint16(len(v)) >> 8)
kvLenBuf[3]=byte(len(v))
kvLen :=uint64(len(kvLenBuf) + len(k) + len(v))
// 校验一下大小
if kvLen >=chunkSize {
return
}
b.mu.Lock()
// 当前索引位置
idx :=b.idx
// 存放完数据后索引的位置
idxNew :=idx + kvLen
// 根据索引找到在 chunks 的位置
chunkIdx :=idx / chunkSize
chunkIdxNew :=idxNew / chunkSize
// 新的索引是否超过当前索引
// 因为还有chunkIdx等于chunkIdxNew情况,所以需要先判断一下
if chunkIdxNew > chunkIdx {
// 校验是否新索引已到chunks数组的边界
// 已到边界,那么循环链表从头开始
if chunkIdxNew >=uint64(len(b.chunks)) {
idx=0
idxNew=kvLen
chunkIdx=0
b.gen++
// 当 gen 等于 1<