深圳幻海软件技术有限公司 欢迎您!

动手实现一个Localcache-实现篇

2023-02-28

前言哈喽,大家好,我是asong,经过了前面两篇的介绍,我们已经基本了解该如何设计一个本地缓存了,本文就是这个系列的终结篇,自己动手实现一个本地缓存,接下来且听我细细道来!!!本文代码已经上传到github:https://github.com/asong2020/go-localcache现在这一

前言

哈喽,大家好,我是asong,经过了前面两篇的介绍,我们已经基本了解该如何设计一个本地缓存了,本文就是这个系列的终结篇,自己动手实现一个本地缓存,接下来且听我细细道来!!!

本文代码已经上传到github:https://github.com/asong2020/go-localcache

现在这一版本算是一个1.0,后续会继续进行优化和迭代。

第一步:抽象接口

第一步很重要,以面向接口编程为原则,我们先抽象出来要暴露给用户的方法,给用户提供简单易懂的方法,因此我抽象出来的结果如下:

// ICache abstract interface 
type ICache interface { 
 // Set value use default expire timedefault does not expire. 
 Set(key string, value []byte) error 
 // Get value if find it. if value already expire will delete
 Get(key string) ([]byte, error) 
 // SetWithTime set value with expire time 
 SetWithTime(key string, value []byte, expired time.Duration) error 
 // Delete manual removes the key 
 Delete(key string) error 
 // Len computes number of entries in cache 
 Len() int 
 // Capacity returns amount of bytes store in the cache. 
 Capacity() int 
 // Close is used to signal a shutdown of the cache when you are done with it. 
 // This allows the cleaning goroutines to exit and ensures references are not 
 // kept to the cache preventing GC of the entire cache. 
 Close() error 
 // Stats returns cache's statistics 
 Stats() Stats 
 // GetKeyHit returns key hit 
 GetKeyHit(key string) int64 

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • Set(key string, value []byte):使用该方法存储的数据使用默认的过期时间,如果清除过期的异步任务没有enable,那么就永不过期,否则默认过期时间为10min。
  • Get(key string) ([]byte, error):根据key获取对象内容,如果数据过期了会在这一步删除。
  • SetWithTime(key string, value []byte, expired time.Duration):存储对象是使用自定义过期时间
  • Delete(key string) error:根据key删除对应的缓存数据
  • Len() int:获取缓存的对象数量
  • Capacity() int:获取当前缓存的容量
  • Close() error:关闭缓存
  • Stats() Stats:缓存监控数据
  • GetKeyHit(key string) int64:获取key的命中率数据

第二步:定义缓存对象

第一步我们抽象好了接口,下面就要定义一个缓存对象实例实现接口,先看定义结构:

type cache struct { 
 // hashFunc represents used hash func 
 hashFunc HashFunc 
 // bucketCount represents the number of segments within a cache instance. value must be a power of two. 
 bucketCount uint64 
 // bucketMask is bitwise AND applied to the hashVal to find the segment id. 
 bucketMask uint64 
 // segment is shard 
 segments []*segment 
 // segment lock 
 locks    []sync.RWMutex 
 // close cache 
 close chan struct{} 

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • hashFunc:分片要用的哈希函数,用户可以自行定义,实现HashFunc接口即可,默认使用fnv算法。
  • bucketCount:分片的数量,一定要是偶数,默认分片数为256。
  • bucketMask:因为分片数是偶数,所以可以分片时可以使用位运算代替取余提升性能效率,hashValue % bucketCount == hashValue & bucketCount - 1。
  • segments:分片对象,每个分片的对象结构我们在后面介绍。
  • locks:每个分片的读写锁
  • close:关闭缓存对象时通知其他goroutine暂停

接下来我们来写cache对象的构造函数:

// NewCache constructor cache instance 
func NewCache(opts ...Opt) (ICache, error) { 
 options := &options{ 
  hashFunc: NewDefaultHashFunc(), 
  bucketCount: defaultBucketCount, 
  maxBytes: defaultMaxBytes, 
  cleanTime: defaultCleanTIme, 
  statsEnabled: defaultStatsEnabled, 
  cleanupEnabled: defaultCleanupEnabled, 
 } 
 for _, each := range opts{ 
  each(options) 
 } 
 
 if !isPowerOfTwo(options.bucketCount){ 
  return nil, errShardCount 
 } 
 
  if options.maxBytes <= 0 { 
  return nil, ErrBytes 
 } 
   
 segments := make([]*segment, options.bucketCount) 
 locks := make([]sync.RWMutex, options.bucketCount) 
 
 maxSegmentBytes := (options.maxBytes + options.bucketCount - 1) / options.bucketCount 
 for index := range segments{ 
  segments[index] = newSegment(maxSegmentBytes, options.statsEnabled) 
 } 
 
 c := &cache{ 
  hashFunc: options.hashFunc, 
  bucketCount: options.bucketCount, 
  bucketMask: options.bucketCount - 1, 
  segments: segments, 
  locks: locks, 
  close: make(chan struct{}), 
 } 
    if options.cleanupEnabled { 
  go c.cleanup(options.cleanTime) 
 } 
  
 return c, nil 

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.

这里为了更好的扩展,我们使用Options编程模式,我们的构造函数主要做三件事:

  • 前置参数检查,对于外部传入的参数,我们还是要做基本的校验
  • 分片对象初始化
  • 构造缓存对象

这里构造缓存对象时我们要先计算每个分片的容量,默认整个本地缓存256M的数据,然后在平均分到每一片区内,用户可以自行选择要缓存的数据大小。

第三步:定义分片结构

每个分片结构如下:

type segment struct { 
 hashmap map[uint64]uint32 
 entries buffer.IBuffer 
 clock   clock 
 evictList  *list.List 
 stats IStats 

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • hashmp:存储key所对应的存储索引
  • entries:存储key/value的底层结构,我们在第四步的时候介绍,也是代码的核心部分。
  • clock:定义时间方法
  • evicList:这里我们使用一个队列来记录old索引,当容量不足时进行删除(临时解决方案,当前存储结构不适合使用LRU淘汰算法)
  • stats:缓存的监控数据。

接下来我们再来看一下每个分片的构造函数:

func newSegment(bytes uint64, statsEnabled bool) *segment { 
 if bytes == 0 { 
  panic(fmt.Errorf("bytes cannot be zero")) 
 } 
 if bytes >= maxSegmentSize{ 
  panic(fmt.Errorf("too big bytes=%d; should be smaller than %d", bytes, maxSegmentSize)) 
 } 
 capacity := (bytes + segmentSize - 1) / segmentSize 
 entries := buffer.NewBuffer(int(capacity)) 
 entries.Reset() 
 return &segment{ 
  entries: entries, 
  hashmap: make(map[uint64]uint32), 
  clock:   &systemClock{}, 
  evictList: list.New(), 
  stats: newStats(statsEnabled), 
 } 

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.

这里主要注意一点:

我们要根据每个片区的缓存数据大小来计算出容量,与上文的缓存对象初始化步骤对应上了。

第四步:定义缓存结构

缓存对象现在也构造好了,接下来就是本地缓存的核心:定义缓存结构。

bigcache、fastcache、freecache都使用字节数组代替map存储缓存数据,从而减少GC压力,所以我们也可以借鉴其思想继续保持使用字节数组,这里我们使用二维字节切片存储缓存数据key/value;画个图表示一下:

使用二维数组存储数据的相比于bigcache的优势在于可以直接根据索引删除对应的数据,虽然也会有虫洞的问题,但是我们可以记录下来虫洞的索引,不断填充。

每个缓存的封装结构如下:

基本思想已经明确,接下来看一下我们对存储层的封装:

type Buffer struct { 
 array [][]byte 
 capacity int 
 index int 
 // maxCount = capacity - 1 
 count int 
 // availableSpace If any objects are removed after the buffer is full, the idle index is logged. 
 // Avoid array "wormhole" 
 availableSpace map[int]struct{} 
 // placeholder record the index that buffer has stored. 
 placeholder map[int]struct{} 

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • array [][]byte:存储缓存对象的二维切片
  • capacity:缓存结构的最大容量
  • index:索引,记录缓存所在的位置的索引
  • count:记录缓存数量
  • availableSpace:记录"虫洞",当缓存对象被删除时记录下空闲位置的索引,方便后面容量满了后使用"虫洞"
  • placeholder:记录缓存对象的索引,迭代清除过期缓存可以用上。

向buffer写入数据的流程(不贴代码了):

第五步:完善向缓存写入数据方法

上面我们定义好了所有需要的结构,接下来就是填充我们的写入缓存方法就可以了:

func (c *cache) Set(key string, value []byte) error  { 
 hashKey := c.hashFunc.Sum64(key
 bucketIndex := hashKey&c.bucketMask 
 c.locks[bucketIndex].Lock() 
 defer c.locks[bucketIndex].Unlock() 
 err := c.segments[bucketIndex].set(key, hashKey, value, defaultExpireTime) 
 return err 

 
func (s *segment) set(key string, hashKey uint64, value []byte, expireTime time.Duration) error { 
 if expireTime <= 0{ 
  return ErrExpireTimeInvalid 
 } 
 expireAt := uint64(s.clock.Epoch(expireTime)) 
 
 if previousIndex, ok := s.hashmap[hashKey]; ok { 
  if err := s.entries.Remove(int(previousIndex)); err != nil{ 
   return err 
  } 
  delete(s.hashmap, hashKey) 
 } 
 
 entry := wrapEntry(expireAt, key, hashKey, value) 
 for { 
  index, err := s.entries.Push(entry) 
  if err == nil { 
   s.hashmap[hashKey] = uint32(index
   s.evictList.PushFront(index
   return nil 
  } 
  ele := s.evictList.Back() 
  if err := s.entries.Remove(ele.Value.(int)); err != nil{ 
   return err 
  } 
  s.evictList.Remove(ele) 
 } 

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.

流程分析如下:

根据key计算哈希值,然后根据分片数获取对应分片位置

如果当前缓存中存在相同的key,则先删除,在重新插入,会刷新过期时间

封装存储结构,根据过期时间戳、key长度、哈希大小、缓存对象进行封装

将数据存入缓存,如果缓存失败,移除最老的数据后再次重试

第六步:完善从缓存读取数据方法

第一步根据key计算哈希值,再根据分片数获取对应的分片位置:

func (c *cache) Get(key string) ([]byte, error)  { 
 hashKey := c.hashFunc.Sum64(key
 bucketIndex := hashKey&c.bucketMask 
 c.locks[bucketIndex].RLock() 
 defer c.locks[hashKey&c.bucketMask].RUnlock() 
 entry, err := c.segments[bucketIndex].get(key, hashKey) 
 if err != nil{ 
  return nil, err 
 } 
 return entry,nil 

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

第二步执行分片方法获取缓存数据:

  • 先根据哈希值判断key是否存在于缓存中,不存返回key没有找到
  • 从缓存中读取数据得到缓存中的key判断是否发生哈希冲突
  • 判断缓存对象是否过期,过期删除缓存数据(可以根据业务优化需要是否返回当前过期数据)
  • 在每个记录缓存监控数据
func (s *segment) getWarpEntry(key string, hashKey uint64) ([]byte,error) { 
 index, ok := s.hashmap[hashKey] 
 if !ok { 
  s.stats.miss() 
  return nil, ErrEntryNotFound 
 } 
 entry, err := s.entries.Get(int(index)) 
 if err != nil{ 
  s.stats.miss() 
  return nil, err 
 } 
 if entry == nil{ 
  s.stats.miss() 
  return nil, ErrEntryNotFound 
 } 
 
 if entryKey := readKeyFromEntry(entry); key != entryKey { 
  s.stats.collision() 
  return nil, ErrEntryNotFound 
 } 
 return entry, nil 

 
func (s *segment) get(key string, hashKey uint64) ([]byte, error) { 
 currentTimestamp := s.clock.TimeStamp() 
 entry, err := s.getWarpEntry(key, hashKey) 
 if err != nil{ 
  return nil, err 
 } 
 res := readEntry(entry) 
 
 expireAt := int64(readExpireAtFromEntry(entry)) 
 if currentTimestamp - expireAt >= 0{ 
  _ = s.entries.Remove(int(s.hashmap[hashKey])) 
  delete(s.hashmap, hashKey) 
  return nil, ErrEntryNotFound 
 } 
 s.stats.hit(key
 
 return res, nil 

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.

第七步:来个测试用例体验一下

先来个简单的测试用例测试一下:

func (h *cacheTestSuite) TestSetAndGet() { 
 cache, err := NewCache() 
 assert.Equal(h.T(), nil, err) 
 key := "asong" 
 value := []byte("公众号:Golang梦工厂"
 
 err = cache.Set(key, value) 
 assert.Equal(h.T(), nil, err) 
 
 res, err := cache.Get(key
 assert.Equal(h.T(), nil, err) 
 assert.Equal(h.T(), value, res) 
 h.T().Logf("get value is %s", string(res)) 

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.

运行结果:

=== RUN   TestCacheTestSuite 
=== RUN   TestCacheTestSuite/TestSetAndGet 
    cache_test.go:33: get value is 公众号:Golang梦工厂 
--- PASS: TestCacheTestSuite (0.00s) 
    --- PASS: TestCacheTestSuite/TestSetAndGet (0.00s) 
PASS 
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

大功告成,基本功能通了,剩下就是跑基准测试、优化、迭代了(不在文章赘述了,可以关注github仓库最新动态)。

参考文章

  • https://github.com/allegro/bigcache
  • https://github.com/VictoriaMetrics/fastcache
  • https://github.com/coocood/freecache
  • https://github.com/patrickmn/go-cache

总结

实现篇到这里就结束了,但是这个项目的编码仍未结束,我会继续以此版本为基础不断迭代优化,该本地缓存的优点:

  • 实现简单、提供给用户的方法简单易懂
  • 使用二维切片作为存储结构,避免了不能删除底层数据的缺点,也在一定程度上避免了"虫洞"问题。
  • 测试用例齐全,适合作为小白的入门项目

待优化点:

  • 没有使用高效的缓存淘汰算法,可能会导致热点数据被频繁删除
  • 定时删除过期数据会导致锁持有时间过长,需要优化
  • 关闭缓存实例需要优化处理方式
  • 根据业务场景进行优化(特定业务场景)

迭代点:

  • 添加异步加载缓存功能
  • ...... (思考中)

本文代码已经上传到github:https://github.com/asong2020/go-localcache

 

好啦,本文到这里就结束了,我是asong,我们下期见。