时序数据库VictoriaMetrics源码解析之写入与索引

2023-12-08 0 303
目录
  • 一. 存储格式
  • 二. 整体流程
  • 三. 写入代码
    • 1.入口代码
    • 2.写入流程的代码
    • 3.写index
    • 4. 生成TSID
    • 5. 创建index items
    • 6. index items存入内存shards

一. 存储格式

下图是向VictoriaMetrics写入prometheus协议数据的示例:

时序数据库VictoriaMetrics源码解析之写入与索引

VM在收到写入请求时,会对请求中包含的时序数据做转换处理:

  • 首先,根据metrics+labels组成的MetricName,生成一个唯一标识TSID;
  • 然后:

    • metric(指标名称__name__) + labels + TSID作为索引index;
    • TSID + timestamp + value作为数据data;
  • 最后,索引index和数据data分别进行存储和检索;

时序数据库VictoriaMetrics源码解析之写入与索引

因此,VM的数据整体上分为索引和数据2个部分:

  • 索引部分,用以支持按照label或tag进行多维检索,得到TSID;
  • 数据部分,用以支持按照TSID得到tv数据;

二. 整体流程

VictoriaMetrics在写入原始的rows数据时,写入过程分为两个部分:

  • 写index;
  • 写tv;

写入流程:

  • 对于原始的rows数据,根据其metricsName从cache和内存索引中,查找其对应的TSID;
  • 若TSID找到,则写入tv数据,返回client;
  • 否则:

    • 写index:

      • 构造TSID,构造新的index items,然后将其写入内存shard;
      • 内存shard被异步的goroutine压缩并保存到磁盘;
    • 写tv数据;
    • 返回client;

时序数据库VictoriaMetrics源码解析之写入与索引

三. 写入代码

1.入口代码

vmstorage监听tcp端口,收到vminsert的插入请求后,进行处理:

// app/vmstorage/servers/vminsert.go
func (s *VMInsertServer) run() {

for {
c, err := s.ln.Accept()

go func() {
bc, err := handshake.VMInsertServer(c, compressionLevel)

err = clusternative.ParseStream(bc, func(rows []storage.MetricRow) error {
vminsertMetricsRead.Add(len(rows))
return s.storage.AddRows(rows, uint8(*precisionBits)) // 入口代码
}, s.storage.IsReadOnly)

}()
}
}

写入时,1次最多写8K个rows:

func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
….
maxBlockLen := len(ic.rrs)
for len(mrs) > 0 {
mrsBlock := mrs
// 一次最多写8K,maxBlockLen=8000
if len(mrs) > maxBlockLen {
mrsBlock = mrs[:maxBlockLen]
mrs = mrs[maxBlockLen:]
} else {
mrs = nil
}
// 写入8K rows的数据
if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))
}
….
}

2.写入流程的代码

写入过程主要分2步:

  • 首先,为row查找或构建TSID;

    • 若该row的metricNameRaw与prevMetricNameRaw,则使用prevTSID;
    • 若cache中有缓存的metricNameRaw,则使用缓存的metricNameRaw对应的TSID;
    • 若上述都不满足,则去内存索引中查找,或者创建一个新的TSID;

      • 这一步是最耗时的;
  • 然后,构建TSID完毕后,插入tv数据;

// lib/storage/storage.go
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {

// 1.构造r.TSID
// 若跟prevMetricNameRaw相同,则使用pervTSID;
// 若cache中有metricNameRaw,则使用cache.TSID;
for i := range mrs {
mr := &mrs[i]

dstMrs[j] = mr
r := &rows[j]
j++
r.Timestamp = mr.Timestamp
r.Value = mr.Value
r.PrecisionBits = precisionBits
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) { // 使用prevTSID
// Fast path – the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
r.TSID = prevTSID
continue
}
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { // 使用缓存的TSID

r.TSID = genTSID.TSID
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw

continue
}

}
if pmrs != nil {
// Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below.
pendingMetricRows := pmrs.pmrs
sort.Slice(pendingMetricRows, func(i, j int) bool {
return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)
})
prevMetricNameRaw = nil
var slowInsertsCount uint64
for i := range pendingMetricRows {

r := &rows[j]
j++
r.Timestamp = mr.Timestamp
r.Value = mr.Value
r.PrecisionBits = precisionBits
// 尝试去index找查找,或者创建
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil {

continue
}
genTSID.generation = idb.generation
genTSID.TSID = r.TSID
// 放回cache
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
}
}

dstMrs = dstMrs[:j]
rows = rows[:j]
err := s.updatePerDateData(rows, dstMrs)
if err != nil {
err = fmt.Errorf(\”cannot update per-date data: %w\”, err)
} else {
// TSID构造完毕,开始插入数据
err = s.tb.AddRows(rows)

}

return nil
}

3.写index

写index是slow path,重点看一下:

  • 首先,去内存索引中找TSID,若找到,则返回;
  • 否则,创建一个新的TSID;

// lib/storage/index_db.go
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
// 1.首先尝试在index中查找
if is.tsidByNameMisses < 100 {
err := is.getTSIDByMetricName(dst, metricName)
// 在index中找到了
if err == nil {
// Fast path – the TSID for the given metricName has been found in the index.
is.tsidByNameMisses = 0
if err = is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
return err
}
return nil
}
is.tsidByNameMisses++
} else {
is.tsidByNameSkips++
if is.tsidByNameSkips > 10000 {
is.tsidByNameSkips = 0
is.tsidByNameMisses = 0
}
}
// 2.没有找到,那么创建一个
if err := is.createTSIDByName(dst, metricName, metricNameRaw, date); err != nil {
userReadableMetricName := getUserReadableMetricName(metricNameRaw)
return fmt.Errorf(\”cannot create TSID by MetricName %s: %w\”, userReadableMetricName, err)
}
return nil
}

4. 生成TSID

具体生成TSID的逻辑:

  • MetricGroupID: 由metricGroup hash而来;
  • JobID:由tags[0].Value hash而来;
  • InstanceID:由tags[1].Value hash而来;

// lib/storage/index_db.go
func generateTSID(dst *TSID, mn *MetricName) {
dst.AccountID = mn.AccountID
dst.ProjectID = mn.ProjectID
dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
if len(mn.Tags) > 0 {
dst.JobID = uint32(xxhash.Sum64(mn.Tags[0].Value))
}
if len(mn.Tags) > 1 {
dst.InstanceID = uint32(xxhash.Sum64(mn.Tags[1].Value))
}
dst.MetricID = generateUniqueMetricID()
}

而TSID中的metricID是由启动时的时间戳+1产生:

// Returns local unique MetricID.
func generateUniqueMetricID() uint64 {
return atomic.AddUint64(&amp;nextUniqueMetricID, 1)
}
var nextUniqueMetricID = uint64(time.Now().UnixNano())

5. 创建index items

  • 创建 MetricName -> TSID index;
  • 创建 MetricID -> MetricName index;
  • 创建 MetricID -> TSID index;
  • 创建 tag -> MetricID 和 MetricGroup+tag -> MetricID index;
  • 最后,将index items存入内存shards;

// lib/storage/index_db.go
func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) {
// The order of index items is important.
// It guarantees index consistency.
ii := getIndexItems()
defer putIndexItems(ii)
// Create MetricName -> TSID index.
ii.B = append(ii.B, nsPrefixMetricNameToTSID)
ii.B = mn.Marshal(ii.B)
ii.B = append(ii.B, kvSeparatorChar)
ii.B = tsid.Marshal(ii.B)
ii.Next()
// Create MetricID -> MetricName index.
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName, mn.AccountID, mn.ProjectID)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
ii.B = mn.Marshal(ii.B)
ii.Next()
// Create MetricID -> TSID index.
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToTSID, mn.AccountID, mn.ProjectID)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
ii.B = tsid.Marshal(ii.B)
ii.Next()
prefix := kbPool.Get()
prefix.B = marshalCommonPrefix(prefix.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
ii.registerTagIndexes(prefix.B, mn, tsid.MetricID)
kbPool.Put(prefix)
is.db.tb.AddItems(ii.Items) // 将items存入内存shards
}

6. index items存入内存shards

Index items构造完成后,被写入内存的shards,会有异步的goroutine将其压缩写入disk。

写内存shards的方法: roundRobin

  • 内存中有若干个index shards;
  • 写入时,轮转写入:idx++ % shards

// lib/mergeset/table.go
func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) {
shards := riss.shards
shardsLen := uint32(len(shards))
for len(items) > 0 {
n := atomic.AddUint32(&riss.shardIdx, 1)
idx := n % shardsLen
items = shards[idx].addItems(tb, items)
}
}

内存中shards总数,跟cpu核数有关系:

  • shards总数 = (cpu*cpu + 1) / 2
  • 对于4C的机器,有8个shards;

// lib/mergeset/table.go
/ The number of shards for rawItems per table.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawItemsShardsPerTable = func() int {
cpus := cgroup.AvailableCPUs()
multiplier := cpus
if multiplier > 16 {
multiplier = 16
}
return (cpus*multiplier + 1) / 2
}()

以上就是时序数据库VictoriaMetrics源码解析之写入与索引的详细内容,更多关于VictoriaMetrics写入索引的资料请关注悠久资源其它相关文章!

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

悠久资源 数据库其它 时序数据库VictoriaMetrics源码解析之写入与索引 https://www.u-9.cn/database/dataqita/129164.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务