手撸源码系列 - cache2go
2022-10-31 08:52:39 Author: Go语言中文网(查看原文) 阅读量:36 收藏

碰到很多同学问我,平时疲于写各种业务代码,如何才能提高编程能力?我的办法是多阅读优秀的代码,只有见过更好的,我们才能知道如何编写好的代码、提高编程能力。就好比如果学武功,你肯定要找一个武林高手作师傅。

进入今天的正题。

前一篇文章,我们分析了 go-cache 库,今天再来看一个缓存库 -- cache2go。

网上也有很多文章分析了这个库,阅读过源码的同学都是这么觉得的:

所以,如果想要提高 Go 语言编程水平的同学,不妨多阅读源码。如果不知道从哪里开始,不如就从 cache2go 开始。

什么是 cache2go

cache2go 是一个并发安全、带有自动过期机制的缓存库。

通过阅读源码我们可以掌握:

  • 作者是如何维护缓存数据的;
  • 作者是如何使用 sync.RWMutex 保证并发安全,有哪些值得借鉴的技巧;
  • 自动过期机制是如何实现的(这块代码思路值得学习);

这个库还提供了如下功能:

  • 可以配置添加、删除、访问数据时的回调函数;
  • 为每个 item 单独设置存活时间;
  • 记录每个 item 的最近访问时间、创建时间、访问次数等;
  • 按照访问次数排序;

项目地址:https://github.com/muesli/cache2go

如何使用

先来练练手,熟悉下如何使用。

type myStruct struct {
 text     string
 moreData []byte
}

var (
 k = "testkey"
 v = "testvalue"
)

func main() {

 cache := cache2go.Cache("myCache"// 创建CacheTable

 val := myStruct{"This is a test!", []byte{}}
 cache.Add("someKey"5*time.Second, &val) // 5s 是 item 的存活时间,超过 5s 不被访问就会被删除

 res, err := cache.Value("someKey"// 获取item
 if err == nil {
  fmt.Println("Found value in cache:", res.Data().(*myStruct).text)
 } else {
  fmt.Println("Error retrieving value from cache:", err)
 }

 time.Sleep(6 * time.Second)       // 休眠6s  5s过后,上面添加的 item 会过期,自动被删除
 res, err = cache.Value("someKey"// 再次获取 item,不存在会报错
 if err != nil {
  fmt.Println("Item is not cached (anymore).")
 }

 cache.Add("someKey"0, &val)                                    // 无过期时间,表示不会过期
 cache.SetAboutToDeleteItemCallback(func(e *cache2go.CacheItem) { // 设置删除回调函数,删除 item 时会自动触发
  fmt.Println("Deleting:", e.Key(), e.Data().(*myStruct).text, e.CreatedOn())
 })

 cache.Delete("someKey"// 手动删除item,会触发删除回调函数

 cache.Flush() // 清空table

 // 完整例子请关注公众号【Golang来啦】,后台发送关键字 cache2go 获取
 
 //TestCache()
 //TestCacheExpire()
 //TestExists()
 //TestNotFoundAdd()
 //TestCacheKeepAlive()
 //TestDelete()
 //TestFlush()
 //TestCount()
 //TestAccessCount()
 //TestCallbacks()
 //TestDataLoader()

 // 完整例子请关注公众号【Golang来啦】,后台发送关键字 cache2go 获取
}

下面我们来看下 cache2go 内部是如何实现前面说的那些功能的。

主要数据结构

我们看下项目目录,如下图:

可以看到代码主要集中在 cache.go、cachetable.go、cacheitem.go 三个文件中,主要的数据结构有三个:

  • cache:用于缓存多个 table;
  • CacheTable:缓存一个 table;
  • CacheItem:存放键值对(item);

存放键值对(K-V)的结构称为 item。

这三个数据结构的关系如下图:

从图中可以看出是三个层级的结构,最底层是 CacheItem,再上一层是 CacheTable,最上一层是 cache。主要的逻辑代码集中在 CacheTable,也是我们学习的重点。从下往上,我们一层层看。

CacheItem

CacheItem 是真正存储数据的结构,详细的字段如下,代码中有注释。与 CacheItem 有关的操作,后面会给大家详解。

type CacheItem struct {
 sync.RWMutex   // 读写锁

 key interface{}  // 键

 data interface{}  // key对应的数据

 lifeSpan time.Duration  // 不被访问后的存活时间   这个时间的含义是:距离上一次被访问的时间如果超过 lifeSpan 则会被自动删除

 createdOn time.Time   // item 添加的时间

 accessedOn time.Time  // 最近一次被访问的时间

 accessCount int64 // 被访问的次数

 aboutToExpire []func(key interface{}) // item 被删除时会触发该回调函数
}

CacheTable

CacheTable 结构体如下:

type CacheTable struct {
 sync.RWMutex   // 读写锁
 
 name string   // table name

 items map[interface{}]*CacheItem   // table 下存放的 item

 cleanupTimer *time.Timer    // 清理定时器,触发下次清理过期 item 的事件

 cleanupInterval time.Duration  // 清理操作触发的时间间隔

 logger *log.Logger

 loadData func(key interface{}, args ...interface{}) *CacheItem  // 当提取一个不存在的 key 时触发的回调函数

 addedItem []func(item *CacheItem)   // 添加 item 会触发的回调函数,可以设置多个

 aboutToDeleteItem []func(item *CacheItem)   // 删除 item 会触发的回调函数,可以设置多个
}

items 字段是一个 map,key 可以任意类型,value 存放 CacheItem 的指针,指向真正的数据。

cache

cache 的结构比较简单,是一个 map,用于集中存放 table。

var (
 cache = make(map[string]*CacheTable)
 mutex sync.RWMutex    // 读写锁
)

与 cache 相关的操作比较简单,先在这里说下:

// 返回一个table,如果已经存在就直接返回,否则执行添加操作,再返回
func Cache(table string) *CacheTable {
 mutex.RLock()
 t, ok := cache[table]  // 判断是否存在
 mutex.RUnlock()

 if !ok {
  mutex.Lock()
  t, ok = cache[table]    // double-check 再次判断是否存在
  if !ok {       // 不存在则添加
    t = &CacheTable{
    name:  table,
    items: make(map[interface{}]*CacheItem),
   }
   cache[table] = t
  }
  mutex.Unlock()
 }

 return t
}

与 cache 有关的操作只有这一个函数 Cache(),在 cache.go 文件中,用来创建 table。上面声明全局变量的那把锁是用来确保操作 cache 并发安全的。代码中也使用了 double-check 机制,进一步确保并发安全, 经常阅读源码的同学应该知道,这是一种常用的安全机制。我觉得自己又学到了?!哈哈

主要操作流程

cache 有关的操作上面已经讲过了,接下来我们主要讲下与 CacheItem、CacheTable 有关的操作。

CacheItem

与 CacheItem 有关的操作主要有初始化、设置过期删除回调函数、更新访问次数/访问时间等。

初始化

// CacheItem 初始化
func NewCacheItem(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
 t := time.Now()
 return &CacheItem{
  key:           key,
  lifeSpan:      lifeSpan,
  createdOn:     t,
  accessedOn:    t,
  accessCount:   0,
  aboutToExpire: nil,
  data:          data,
 }
}

初始化操作干的就是初始化一个 CacheItem 并返回变量地址,需要特别注意的是 lifeSpan 变量,表示 item 的存活时间:距离上一次被访问的时间如果超过 lifeSpan,则 item 会被自动删除。

设置回调函数

item 被删除时会被自动调用回调函数,由下面三个方法设置:

// 设置回调函数,如果原来已经有回调函数则先清空再添加
func (item *CacheItem) SetAboutToExpireCallback(f func(interface{})) {
 if len(item.aboutToExpire) > 0 {
  item.RemoveAboutToExpireCallback()
 }
 item.Lock()
 defer item.Unlock()
 item.aboutToExpire = append(item.aboutToExpire, f)
}

// 添加回调函数
func (item *CacheItem) AddAboutToExpireCallback(f func(interface{})) {
 item.Lock()
 defer item.Unlock()
 item.aboutToExpire = append(item.aboutToExpire, f)
}

// 删除回调函数
func (item *CacheItem) RemoveAboutToExpireCallback() {
 item.Lock()
 defer item.Unlock()
 item.aboutToExpire = nil
}

与回调函数有关的完整示例请关注公众号【Golang来啦】,后台发送关键字 cache2go 获取。

更新访问次数/时间

// 更新 item 的访问次数、时间
func (item *CacheItem) KeepAlive() {
 item.Lock()
 defer item.Unlock()
 item.accessedOn = time.Now()
 item.accessCount++
}

每次获取数据时,如果 item 存在都会调用这个方法更新 item 的数据。item 的访问时间(accessedOn)被更新之后,item 的“寿命”又延长了,哈哈。

CacheTable

caceh2go 的主要逻辑集中在对 table 的操作,主要包括添加、删除、过期检查、数据获取(当数据不存在时,可以自定义回调函数,实现从数据库、文件等其他地方获取数据并添加到缓存)。如何实现这些功能是我们阅读源码的重点。

添加item

在 table 中添加 item,通过 Add() 方法实现,参数 lifeSpan 是 item 的存活时间。先看流程图,方便理解代码。

代码如下:

func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
 item := NewCacheItem(key, lifeSpan, data)   // 初始化item

 table.Lock()   // 加锁
 table.addInternal(item)   // 添加item

 return item
}

// 内部添加方法实现,调用这个方法之前必须保证 table-mutex 已经上锁
func (table *CacheTable) addInternal(item *CacheItem) {

 table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name)
 table.items[item.key] = item    // 添加操作

 // 将需要的数据先拿出来,及时释放锁,保证锁粒度最小化,后面如果有回调函数需要执行可能会比较耗时
 expDur := table.cleanupInterval // 当前的清理时间间隔
 addedItem := table.addedItem    // 添加时待执行的回调函数
 table.Unlock()    // 释放锁

 if addedItem != nil {   // 如果回调函数存在,则执行
  for _, callback := range addedItem {
   callback(item)
  }
 }
 // 保证锁最小粒度,注意不要在这里释放锁

 // 什么时候需要触发过期检查?  答:item设置了存活时间且清理时间间隔等于0 或 设置了存活时间且存活时间小于时间间隔
 // 注:expDur == 0 表示还没有设置过定时清理器(在过期清理方法里设置,接下去我们会讲到)

 // 这里为什么要触发过期检查功能?
 // 答:有可能清理定时器下一次触发的时间比新加的这个 item 存活时间要长,此时就必须及时更新清理定时器,
 // 保证到存活时间点能及时调用过期清理函数,将过期的 item 清理掉
 if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
  table.expirationCheck()
 }
}

主要的代码逻辑看注释,有不清楚或者有误的地方,可以私我交流下。

有几点需要注意的地方:

锁的应用:1、上锁/释放锁的时机:调用 addInternal() 方法之前,必须保证 table-mutex 已经上锁,因为方法里有解锁操作;2、保证锁最小粒度,释放锁不应该放在调用回调函数之后,因为有可能回调函数执行时间较长,其他 goroutine 获取不到锁造成不必要的等待,影响性能;

过期检查函数调用的条件是什么?添加 item 时为什么要做过期检查?解析可以看注释。

过期检查

过期检查的流程图如下:

代码如下:

func (table *CacheTable) expirationCheck() {
 table.Lock()
 if table.cleanupTimer != nil {   // 如果存在清理定时器,则先停止
  table.cleanupTimer.Stop()
 }
 if table.cleanupInterval > 0 {
  table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
 } else {
  table.log("Expiration check installed for table", table.name)
 }

 now := time.Now()
 smallestDuration := 0 * time.Second  // smallestDuration,所有 item 中最小的过期时长,初值为 0,后面会更新
 for key, item := range table.items {
  // 获取 item 的数据,使用读写锁
  item.RLock()
  lifeSpan := item.lifeSpan
  accessedOn := item.accessedOn
  item.RUnlock()

  if lifeSpan == 0 {  // 存活时间等于 0 表示 item 一直存活,不会过期
   continue
  }
  if now.Sub(accessedOn) >= lifeSpan {    // 距离 item 上一次被访问时间已经超过存活时间,则删除 item
   table.deleteInternal(key)  // 调用内部删除方法
  } else {
   // 通过对比找到所有 item 中最小的过期时长
   if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
    smallestDuration = lifeSpan - now.Sub(accessedOn)
   }
  }
 }

 // 根据 smallestDuration 设置清理周期,并设置一个定时器,触发下一次过期清理动作
 table.cleanupInterval = smallestDuration
 if smallestDuration > 0 {
  table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
   go table.expirationCheck()    // 调用过期检查
   // 这里并不是循环调用,启动一个新的 goroutine 后当前 goroutine 会退出,并不会引起 goroutine 泄漏
  })
 }
 table.Unlock()
}

可以看到 expirationCheck() 方法实现了一个动态的清理器,每次执行的时会找出最快过期的 item 的存活时长 smallestDuration,根据这个值定义一个定时器,到点执行清理函数,这样往复循环。

需要注意的是,方法开始判断了是否设置了定时器,如果已经设置就先停止。

这在什么情况下会发生呢?

比如之前已经设置了清理定时器,将在 10s 后执行,此时新添加了一个 item,存活时长是 5s,此时就会触发执行 expirationCheck() 方法,更新定时清理器。

expirationCheck() 方法调用了删除方法,接着一起看下。

删除

除了 item 过期时会自动调用删除方法之外,cache2go 对外还提供了 Delete() 方法手动删除 item。

流程图:

代码如下:

// 删除 item,存在的话会返回 item,否则返回错误
func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
 table.Lock()
 defer table.Unlock()

 return table.deleteInternal(key)
}

// 内部删除方法
func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) {
 r, ok := table.items[key]
 if !ok {   // 不存在
  return nil, ErrKeyNotFound
 }

 // 将需要的数据先拿出来,及时释放锁,保证锁粒度最小化,后面如果有回调函数需要执行可能会比较耗时
 aboutToDeleteItem := table.aboutToDeleteItem
 table.Unlock()

 // Trigger callbacks before deleting an item from cache.
 if aboutToDeleteItem != nil {   // table 有删除回调函数,则执行
  for _, callback := range aboutToDeleteItem {
   callback(r)
  }
 }

 r.RLock()
 defer r.RUnlock()
 if r.aboutToExpire != nil {  // item 有过期回调函数,则执行
  for _, callback := range r.aboutToExpire {
   callback(key)
  }
 }

 table.Lock()
 table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
 delete(table.items, key)  // 删除

 return r, nil
}

删除方法逻辑比较简单,这里涉及到调用两处删除回调函数,一处是在 table 定义的删除回调函数,如果 item 也定义了删除回调函数,也是需要执行的。

另外需要注意的是保证锁的最小粒度。

数据获取

cache2go 对外提供了 Value() 方法用于获取 item,拿到 item 之后就可以调用 item 的方法,获取 item 有关的信息,比如访问时间/次数、存活时长、与 key 关联的数据等。


// 提取 key 对应的 item 数据,如果缓存中不存在,如果设置了 data-loader 回调函数,则会尝试去别的地方,比如数据库、文件等,获取数据并添加到缓存中
func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) {
 table.RLock()
 r, ok := table.items[key]
 loadData := table.loadData
 table.RUnlock()

 if ok {
  r.KeepAlive()   // 更新访问次数(accessCount) 和访问时间(accessedOn),重新设置 accessedOn,表示该 item 还可以存活 lifeSpan 时长
  return r, nil
 }

 // 如果 item 不存在,则尝试使用 data-loader 生成一个 item 并添加到 cache 里
 if loadData != nil {
  item := loadData(key, args...)   // 返回一个 item
  if item != nil {
   table.Add(key, item.lifeSpan, item.data)  // 执行添加操作,将获取到的数据添加到缓存中
   return item, nil
  }

  return nil, ErrKeyNotFoundOrLoadable
 }

 return nil, ErrKeyNotFound
}

可以看到当 item 存在时,会调用 KeepAlive() 方法更新 item 的访问时间(accessedOn),表示该 item 还可以存活 lifeSpan 时长,延长 item 的“寿命”。

另外,Value() 函数里有个比较有意思的地方,如果设置了 data-loader 回调函数,并且 item 不存在时,会调用回调函数, 在回调函数里面可以写自己想要实现的逻辑代码,比如从数据库、文件等获取数据并添加到缓存里。

data-loader 回调函数通过 SetDataLoader() 方法设置。

与 data-loader 有关的完整示例请关注公众号【Golang来啦】,后台发送关键字 cache2go 获取。

cache2go 的数据获取逻辑与 go-cache 不同之处在于,go-cache 获取数据时会判断 item 的过期时间,如果已过期则认为数据已经不存在。go-cache 为什么会判断过期时间呢?因为它的过期清理周期是固定的,有可能还没到清理时间 item 已经过期了。而 cache2go 的清理周期是自动动态调整,能保证在过期时间点将已过期的 item 及时清理掉。

关于 go-cache 的源码解读可以看这篇文章

其他

上面提到的几个功能是我们学习的重点,其他功能,比如对 table 设置添加回调函数、删除回调函数等,比较简单,大家可以拉源码来看下,就不在这分析。

另外 table 还提供了一个方法 MostAccessed(),功能:将 item 按访问次数倒序并返回前 count 个 item,里面用到了 sort.Sort() 排序,感兴趣的同学可以看下,有不清楚的欢迎私我交流。

总结

cache2go 的代码里不算多,结构分层清晰,函数功能职责明确。通过阅读源码,可以学习:

  • 加深对锁的灵活使用,在一个方法中加锁,到另一个方法中解锁,比如在 Delete() 方法解锁,在 deleteInternal() 方法中解锁,再加锁,最后回到 Delete() 中解锁,这么做的目的是保证锁的最小粒度,尽可能不阻塞其他 goroutine 执行。在源码其他方法中也有相同的使用方式;
  • 如何实现一个动态的过期清理机制;
  • 如何设置回调方法,方便对数据进行维护;

如果大家阅读源码过程中有什么疑惑或者文章分许有误,欢迎大家留言交流,一起学习成长!


推荐阅读

福利
我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。


文章来源: http://mp.weixin.qq.com/s?__biz=MzAxMTA4Njc0OQ==&mid=2651453642&idx=1&sn=915577de30c69580554828955d1911c9&chksm=80bb2638b7ccaf2ecfcd98cb7ed29efaf32a0ae928eede5f8ec33bd6d1128b78c716bee4c350#rd
如有侵权请联系:admin#unsafe.sh