Victoria Metrics 写入流程分析(上篇)

发布: 2024-07-01   上次更新: 2024-07-01   分类: 数据库   标签: vm

文章目录

Victoria Metrics 是一个功能强大且易于使用的时序数据库,它之所以能够在众多的 TSDB 中脱颖而出,主要得益于其强劲的性能,和简单的部署模式,本篇文章就结合其源码,来分析其写入流程,看看它是如何解决高基数下的高吞吐。

本文基于 b39985 分析,提交时间 2024-03-18

术语定义

为了便于理解,这里首先介绍 VM 代码中常用的一些术语:

  • metricName,对于下面这个指标:

    memory_usage{datacenter="foo1", job="bar1", instance="baz1:1234"}

    metricName 指的是 memory_usage 加上后面的所有 label 对, memory_usage 称为 MetricGroup。这和其他时序数据库稍有不同。

  • TSID,每个 metricName 会对应一个唯一的 TSID,它有以下几个字段:

    • MetricGroupID,metricGroup 的哈希值
    • JobID: 第 0 个 label 的哈希值
    • InstanceID: 第 1 个 label 的哈希值
    • MetricID: 递增分配的唯一 ID

    在上面这四个中,很明显 MetricID 是最重要,因为它需要持久化,并不能简单通过某些手段计算出来。

重要结构介绍

Storage

Storage 是 VM 中的存储层,数据的写入和查询最终都会调到它的相应方法,因此这里先介绍它的结构。主要有如下几个字段:

  • idbCurr、idbNext,两个指向 indexdb 的指针,分布表示当前和下一个 indexdb(索引结构,后面会介绍),之所以有两个,主要是为了防止当前 indexdb 过期时,写下一个 indexdb 带来的剧烈抖动,解决方式就是在 idbCurr 将要过期的前一个小时,按照一定比例提前预写 idbNext。参见:#1401
  • tb *table,负责数据的持久化
  • tsidCache MetricName -> TSID 的缓存
  • metricIDCache MetricID –> TSID 的缓存
  • metricNameCache MetricID –> MetricName 的缓存
  • dateMetricIDCache generation + date –> MetricID
  • currHourMetricIDs, 当前小时内 MetricID 的缓存
  • prevHourMetricIDs, 上一小时内 MetricID 的缓存
  • nextDayMetricIDs, generation + date -> MetricID,下一天中活跃的 MetricID 缓存,思路和 idb 类似,用于预写下一天中的倒排索引用的

generation,相当于一个存储层的唯一 ID,根据时间戳递增生成。用于淘汰老数据用,比如设置数据保存 31 天,当写第 32 天的数据时,就会新开一个

IndexDB

IndexDB 用于管理存储层对应的索引,主要有如下几个字段:

  • generation ,相当于唯一 ID,随时间戳递增,主要用于在不同 indexdb 间同步数据用。
  • tb *mergeset.Table 负责索引的持久化
  • extDB 上一个周期的 indexdb 实例
  • tagFiltersToMetricIDsCache TagFilters -> MetricIDs 的缓存,加速查询用
  • loopsPerDateTagFilterCache (date, tagFilter) -> loopsCount 的缓存,加速查询用
  • s *Storage 与之对应的存储层实例

可以看到,Storage、IndexDB 中都出现了 table 字段,但是 VM 中的表只是代表一个存储或索引,并不是一般意义上的表。

VM 存储结构

通过上图可以看到,Storage 里面使用了大量的缓存,主要目的就是用来解决新时间线的写入,只有当缓存(tsidCache)中找不到指标对应时间线时,才回去磁盘上的 indexdb 中查找,如果 indexdb 中也没有,那就新创建一个 TSID,并将其插入到缓存中。

在通过 indexdb 时间线时,为了防止时间线过多,查询效率差,indexdb 内部做了个按天做了个拆分,不同天的索引可能是不同,这样就可以极大提升查找的速度。在上图可以看到,在一个回收周期内(默认 31 天),除了一个全局的 global 索引,还有 31 个子索引。正常的查询按照时间范围来在对应子索引区域查找即可,只有在时间跨度大于 40 天时,才会从 global 区域查找,因为这时从每个子索引查找的代价已经大于从全局中查找。

写入流程

有了上面对 Storage、IndexDB 的分析,下面再来看写入流程就会相对比较简单了:

VM 写入流程图

下面参照上图,本小节来详细介绍一下写入步骤,(MetricName简写 MN): 首先先从左侧开始看起:

  1. 尝试从 tsidCache 查找 MN 是否已经有对应时间线,如果没有,则进入 Slow path
  2. 在当前指标对应天数的子索引内,查找是否在 indexdb 内,在查找时,会在当前 indexdb 以及上一个 indexdb(即 idb.ExtDB)中查找,如果没有则进入 SLowest path
  3. 为当前 MN 创建 TSID,如前文分析,MetricID 的创建最为重要,其逻辑在 generateUniqueMetricID 函数内部:
1
2
3
4
5
6
7
8
func generateUniqueMetricID() uint64 {
	return nextUniqueMetricID.Add(1)
}
var nextUniqueMetricID = func() *atomic.Uint64 {
	var n atomic.Uint64
	n.Store(uint64(time.Now().UnixNano()))
	return &n
}()

可以看到,VM 这里采用了自增的方式来生产 MetricID,自增 ID 的一个好处主要好处就是密集,便于 bitmap 化。 这里还想说一点就是自增的起点,VM 设置的也比较巧妙,采用的是服务启动时间的纳秒值, 这样只要服务器时钟不回调,理论是不可能出现冲突的。

左侧的步骤逻辑相对简单,主要确定 TSID,是已经存在还是需要新建,右侧是 TSID 已经找到后对应的逻辑,逻辑相对较复杂:

  1. 首先判断 TSID 是否已经超限,VM 里会有两个维度的检查,小时级别、天级别。
  2. tsid.gen < idb.gen 这是判断 TSID 是否属于当前回收窗口内的,这也说明 tsidCache 是全局的一个缓存。如果判定 TSID 来自上一周期,则需要写入到当前 indexdb 中,即步骤 3
  3. idb.CreateIndex 这一步将在当前 indexdb 中写入改时间线对应的索引,分为两部分:Global、PerDay
  4. tsid.gen = idb.gen 更新 TSID 对应的 generation 为当前 indexdb 的 generation
  5. s.updateSeriesCache 更新缓存,主要包括:

    • tsidCache ,这也下次相同的 MN 可以直接进入 Fast path 的写入路径
    • dateMetricIDCache ,天级别的 MetricID 索引
  6. 接下来两步为预写索引,逻辑较为复杂,后面会有两小节来专门来介绍。
  7. MustAddRows 这是最后一步,真正的持久化数据,鉴于本文篇幅,不再展开讲述这里面的具体流程。

预写 idbNext

s.prefillNextIndexDB 这一步预写下一个 indexdb(由 idbNext 指针指向),大致逻辑如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
d := s.nextRetentionSeconds()
if d >= 3600 {
	// Fast path: 只在索引轮换前最后一个小时内进行预写
	return nil
}

for i := range rows {
	r := &rows[i]
	// 当 d=[3600 ... 0] 时,概率从 0% 到 100% 成比例增加。
	// 这也符合直觉,因为越新的数据,越可能出现在第二天
	pMin := float64(d) / 3600
	p := float64(uint32(fastHashUint64(r.TSID.MetricID))) / (1 << 32)
	if p < pMin {
		// Fast path: it is too early to pre-fill indexes for the given MetricID.
		continue
	}

	// 在缓存中判断是否已经预写过,这里的 generation 是 idbNext 的
	if s.dateMetricIDCache.Has(generation, date, metricID) {
		continue
	}

	// 接着在 idbNext 中判断,如果有,则更新缓存,这样下次就就可以直接通过缓存判断
	if isNext.hasDateMetricIDNoExtDB(date, metricID) {
		s.dateMetricIDCache.Set(generation, date, metricID)
		continue
	}

	// 创建为 idbNext 的 Global 与 PerDay 中的索引
	createAllIndexesForMetricName(isNext, mn, &r.TSID, date)
	genTSID.TSID = r.TSID
	genTSID.generation = generation
	// 更新 tsidCache 与 dateMetricIDCache
	s.putSeriesToCache(metricNameRaw, &genTSID, date)
}

预写天级别索引

s.updatePerDateData 是更新天级别的索引与缓存,和更新 idbNext 思路类似,在当天接近 23 点时,开始预写下一天的索引。大致逻辑如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
hm := s.currHourMetricIDs.Load()
hmPrev := s.prevHourMetricIDs.Load()
// 这里首先把 ts 转成 hour,然后再减去 23,在正数时有效,那么 pMin 的取值变化就是 [0...1]
// 也就是说预写的概率越来越大
pMin := (float64(ts%(3600*24)) / 3600) - 23
for i := range rows {
	r := &rows[i]
	date = uint64(r.Timestamp) / msecPerDay
	hour = uint64(r.Timestamp) / msecPerHour
	if hour == hm.hour { // 属于当前小时,从 currHour 缓存中查找
		if hm.m.Has(metricID) { // Fast path,从当前缓存中找到
			if pMin > 0 { // hour 大于 23 时才开始预写下一天数据
				p := float64(uint32(fastHashUint64(metricID))) / (1 << 32)
				if p < pMin && !nextDayMetricIDs.Has(metricID) {
					pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
						date: date + 1, // 加 1 表示预写下一天的索引
						tsid: &r.TSID,
						mr:   mrs[i],
					})
					pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID)
				}
			}
			continue
		}
		pendingHourEntries = append(pendingHourEntries, metricID)
		// 已经在当前的前一个小时写入过了
		if date == hmPrevDate && hmPrev.m.Has(metricID) {
			continue
		}
	}

	// Slow path 从全局缓存中查找
	if s.dateMetricIDCache.Has(generation, date, metricID) {
		continue
	}

	// Slow path: 在 indexdb 插入 (date, metricID)
	pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
		date: date,
		tsid: &r.TSID,
		mr:   mrs[i],
	})

	for _, dmid := range pendingDateMetricIDs {
		if !is.hasDateMetricIDNoExtDB(date, metricID) {
			mn.sortTags()
			// 创建天级别的索引
			is.createPerDayIndexes(date, dmid.tsid, mn)
		}
		dateMetricIDsForCache = append(dateMetricIDsForCache, dateMetricID{
			date:     date,
			metricID: metricID,
		})
	}
	// 在索引更新成功后,再把 (date, metricID) 对插入到缓存中
	s.dateMetricIDCache.Store(generation, dateMetricIDsForCache)
}

上面代码中,对于缓存中不存在的 MetricID,并没有立马更新到其中,而是记录在了 pendingHourEntries , 然后在后台通过一个单独的 goroutine 来将 pending 中的数据更新到 currHourMetricIDs 缓存中。可以想到, 这样做的主要好处是减少锁竞争,而且这个缓存的实时性要求并不高,没必要每次写入都来更新。

总结

通过本文的分析可以拿到,VM 中解决高基数时间线写入的思路就是『分而治之』,从 PerDay 索引的设计,到 currHourMetricIDs 这种 小时级别缓存的设计,尽可能保证活跃的结构足够小,而且对于时序来说,查询大概率会命中最新一天的数据,因此通过按天隔离, 也有利于查询的性能提升。

而且 VM 中有很多细节可以体现其工程能力,比如在定义 Storage 时,开头有这么一段注释:

1
2
3
4
5
6
type Storage struct {
	// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
	tooSmallTimestampRows uint64
	tooBigTimestampRows   uint64
  ..
}

代码中还有不少类似细节,可见 VM 团队花了不少心思来提升性能。鉴于篇幅,本文只介绍了写入流程,数据在磁盘上的组织方式没有介绍, 这一点对于性能也是至关重要,下一篇文章就来介绍这个。

参考

评论

欢迎读者通过邮件与我交流,也可以在 MastodonTwitter 上关注我。