tagKeyValueEntry
定义:
type tagKeyValueEntry struct {
m map[uint64]struct{} // series id set
a seriesIDs // lazily sorted list of series.这两个字段存储的是相同的series id的集合}获取series id集合
func (e *tagKeyValueEntry) ids() seriesIDs { if e == nil { return nil
}
//首先调用`ids`方法时才给a赋值
if len(e.a) == len(e.m) { return e.a
}
a := make(seriesIDs, 0, len(e.m)) for id := range e.m {
a = append(a, id)
}
//基数排序
radix.SortUint64s(a)
e.a = a return e.a
}tagKeyValue
tag value到tagKeyValueEntry的映射,也就是记录了tag value到对应的所有series id的映射
定义:
type tagKeyValue struct {
mu sync.RWMutex
entries map[string]*tagKeyValueEntry
}series
表示一个series, 包括其tagset, series key, 属于哪一个measurement
type series struct {
mu sync.RWMutex
deleted bool// immutable
ID uint64
Measurement *measurement
Key string // 这个就是 series key
Tags models.Tags //当前series包括的tag key 和 tag value集合
}
measurement
包含了一个measurement对应的所有series信息
定义:
type measurement struct {
Database string
Name string `json:"name,omitempty"`
NameBytes []byte // cached version as []byte
mu sync.RWMutex
fieldNames map[string]struct{} //存储了当前measurement的所有字段
// in-memory index fields
seriesByID map[uint64]*series // series id到series的映射
seriesByTagKeyValue map[string]*tagKeyValue // tagkey -> tag value -> series id set
// lazyily created sorted series IDs
sortedSeriesIDs seriesIDs // sorted list of series IDs in this measurement
// Indicates whether the seriesByTagKeyValueMap needs to be rebuilt as it contains deleted series
// that waste memory.
dirty bool}我们下面分析这几重点的方法
获取当前measurement包含的所有tag key,且是按string排序的
func (m *measurement) TagKeys() []string {
m.mu.RLock()
keys := make([]string, 0, len(m.seriesByTagKeyValue)) for k := range m.seriesByTagKeyValue {
keys = append(keys, k)
}
m.mu.RUnlock()
sort.Strings(keys) return keys
}获取当前measurement包含的所有tag value
func (m *measurement) TagValues(auth query.Authorizer, key string) []string {
m.mu.RLock()
defer m.mu.RUnlock()
values := make([]string, 0, m.seriesByTagKeyValue[key].Cardinality()) //遍历所有的tagKeyValue
m.seriesByTagKeyValue[key].RangeAll(func(k string, a seriesIDs) { if query.AuthorizerIsOpen(auth) {
values = append(values, k)
} else { for _, sid := range a {
s := m.seriesByID[sid] if s == nil { continue
} if auth.AuthorizeSeriesRead(m.Database, m.NameBytes, s.Tags) {
values = append(values, k) return
}
}
}
}) return values
}获取tag key对应的所有series id
func (m *measurement) SeriesIDsByTagKey(key []byte) seriesIDs {
tagVals := m.seriesByTagKeyValue[string(key)] if tagVals == nil { return nil
}
var ids seriesIDs
tagVals.RangeAll(func(_ string, a seriesIDs) {
ids = append(ids, a...)
})
sort.Sort(ids) return ids
}*获取tag value对应的所有 series id
func (m *measurement) SeriesIDsByTagValue(key, value []byte) seriesIDs { tagVals := m.seriesByTagKeyValue[string(key)]
if tagVals == nil {
return nil
} return tagVals.Load(string(value))
}index
包括了若干个measurement的index信息
定义:
type Index struct {
mu sync.RWMutex
database string //数据库名字
sfile *tsdb.SeriesFile //_series 目录下series file
fieldset *tsdb.MeasurementFieldSet // In-memory metadata index, built on load and updated when new series come in
measurements map[string]*measurement // measurement name to object and index
series map[string]*series // map series key to the Series object
seriesSketch, seriesTSSketch estimator.Sketch
measurementsSketch, measurementsTSSketch estimator.Sketch // Mutex to control rebuilds of the index
rebuildQueue sync.Mutex
}IndexSet
包含了Index的集合,所有这些Index都属于同一个DB,每个shard对应一个Index
type IndexSet struct {
Indexes []Index // The set of indexes comprising this IndexSet.
SeriesFile *SeriesFile // The Series File associated with the db for this set.
fieldSets []*MeasurementFieldSet // field sets for _all_ indexes in this set's DB.
}
作者:扫帚的影子
链接:https://www.jianshu.com/p/ea7d0cb37405