目录
- 写作背景
- 名称解释
- 源码剖析
- 经典案例
- 总结
写作背景
缓存在项目中使用应该是非常频繁的,提到缓存只要了解过singleflight,基本都会用于缓存实现的一部分吧?但singleflight要用好也不容易。
名称解释
singleflight来源于准官方库(也可以说官方扩展库)golang.org/x/sync/singleflight包中。它的作用是避免同一个 key 对下游发起多次请求,降低下游流量。
源码剖析
3个结构体
Group是singleflight的核心,代表一个组,用于执行具有重复抑制的工作单元。
type Group struct {
mu sync.Mutex
m map[string]*call
}
mu是保护m字段的互斥锁,确保对调用信息的访问是线程安全的。m是一个map,键是函数的唯一标识符,值是call结构体,代表一次函数调用的信息,包括函数的返回值和错误。
call代表一次函数调用的信息,把函数的调用结果封装到call中
type call struct {
wg sync.WaitGroup
// 这些字段在 WaitGroup 完成之前只被写入一次,并且在 WaitGroup 完成之后只被读取
val interface{} // 函数调用的返回值
err error // 函数调用可能出现的错误
dups int // 相同 key 调用次数
chans []chan<- Result // 结果通道列表,仅调用 DoChan() 方法时返回
}
Result结构体用于保存DoChan()方法的执行结果,以便将结果传递给通道。
type Result struct {
Val interface{}
Err error
Shared bool
}
4个方法
Group主要提供了3个公开方法和1个非公开方法。
Do()方法,相同的key对应的fn函数只会调用一次。返回值v调用fn()方法返回的结果;err调用fn()返回的err;shared:表示在多次调用的结果是否共享。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
源码比较简单,如果key对应的fn函数已被调用,则等待fn函数调用完成直接返回结果。如果fn未被调用,new(call)存入m中,执行doCal()方法。
doCall()方法,调用key对应的fn方法。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
defer func() {
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
if g.m[key] == c {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
if len(c.chans) > 0 {
go panic(e)
select {}
} else {
panic(e)
}
} else if c.err == errGoexit {
} else {
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
doCall()代码比较简单,doubledefer双延迟机制区分panic和runtime.Goexit。第二个defer会先执行调用fn()函数,如果未正常返回将会补获异常,并将堆栈信息存入err中。
第一个defer先将key从m中移除,再就是异常处理,如果是Goexit正常退出,如果断言是panicError将对外抛出Panic。若正常退出将结果发送到chans通道列表中。
DoChan()方法类似于Do()方法,返回通道(chan),通过通道接收数据。另外通道不会被关闭。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
Forget()方法,可以理解为丢弃某一个key,后面该key会被立即调用,而不是等待先前的调用完成。
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}
经典案例
缓存场景在大家的业务场景中应该是被广泛使用的,大部分的场景使用应该都是下图吧?
从单体应用到微服务化,调用下游服务一般如下图吧?
假设缓存Miss所有流量会瞬间打到数据库,或者所有流量都会打到server2,如果学习过singleflight的同学,肯定会把它用在reids->db或server->server2之间,包括我也是。如下图(只举数据库案例)。
在使用singleflight之前你先确定下你的业务场景,key相同的情况多吗?(可以统计一些数据,我们业务场景同一个key多次调用下游概率是比较高的)如果key相同的情况比较少,singleflight对你的帮助可能不大。
上面列举2种方案。
1、 singleflight介于redis和db之间,redis是内存缓存qps高、响应也快。大部分情况不会成为瓶颈,但数据库就不一样了,所以这种方案可以防止缓存被击穿流量打到数据库。
2、 singleflight介于server和redis之间,网上挺多推荐这种用法的,有必要用此方案吗?大家可以思考下,文章末尾我给出我的想法。
我更倾向方案一。代码如下:
func TestSingleFlight(t *testing.T) {
var (
n = 10
k = \”12344556\”
wg = sync.WaitGroup{}
sf singleflight.Group
)
for i := 0; i < n; i++ {
go func() {
wg.Add(1)
defer wg.Done()
r, err, shared := sf.Do(k, func() (interface{}, error) {
return get(k)
})
if err != nil {
panic(err)
}
fmt.Printf(\”r=%v,shared=%v\\n\”, r, shared)
}()
}
wg.Wait()
}
func get(key string) (interface{}, error) {
time.Sleep(time.Microsecond) // todo 模拟业务处理
return key, nil
}
输出结果如下
===RUNTestSingleFlightr=12344556,shared=truer=12344556,shared=truer=12344556,shared=truer=12344556,shared=truer=12344556,shared=truer=12344556,shared=falser=12344556,shared=truer=12344556,shared=falser=12344556,shared=truer=12344556,shared=true—PASS:TestSingleFlight(0.00s)PASS
打印结果中为true都代表调用get()函数返回结果被共享。get函数调用明显降低了。
这种写法在函数正常返回情况下是能拿到正确的结果,如果下游返回异常了呢?(业务上遇过下游返回3-4s的拉低业务处理速度)因为Do()方法是以阻塞的方式来控制对下游的调用的,如果某一个请求被阻塞了,同一个key后面的请求都会被阻塞。
假设有一场景(SOP),消费kafka消息处理业务逻辑,业务高峰期某一时间段生产消息量为100w,单pod消费速度500/s,请求下游用singleflight控制对下游(三方接口)的并发量,假设下游某一次请求耗时2s。这时会有几个问题:
1、若某一个key被阻塞后续该key大量请求被阻塞,若这批请求失败从而导致消息处理失败,如果对消息重试会加剧业务下游压力。
2、单pod消费速度从500/s,降低到个位数,消费时间拉长,消息堆积(如果消息堆积对实时性要求场景影响视频很大的)。
造成这个问题主要原因如下:
singleflight是同步阻塞且缺乏超时控制机制,若某一个key阻塞后面次key都会被阻塞并且等待第一次结束。
singleflight虽然能降低对下游的请求量,但在某些场景失败的情况也增加了。
我们有办法给singleflight加一个超时时间吗?答案是肯定有的
下面这段代码singleflight没有增加超时控制
var (
offset int32 = 0
)
func TestSingleFlight(t *testing.T) {
var (
n int32 = 1000
k = \”12344556\”
wg = sync.WaitGroup{}
sf singleflight.Group
failCnt int32 = 0
)
for i := 0; i < int(n); i++ {
go func() {
wg.Add(1)
defer wg.Done()
_, err, _ := sf.Do(k, func() (interface{}, error) {
return get(k)
})
if err != nil {
atomic.AddInt32(&failCnt, 1)
return
}
}()
}
wg.Wait()
fmt.Printf(\”总请求数=%d,请求成功率=%d,请求失败率=%d\”, n, n-failCnt, failCnt)
}
func get(key string) (interface{}, error) {
var err error
if atomic.AddInt32(&offset, 1) == 3 { // 假设偏移量 offset == 3 执行耗时长,超时失败了
time.Sleep(time.Microsecond * 500)
err = fmt.Errorf(\”耗时长\”)
}
return key, err
}
结果输出如下
===RUNTestSingleFlight总请求数=1000,请求成功率=792,请求失败率=208—PASS:TestSingleFlight(0.00s)PASS
singleflight增加超时控制代码如下
func TestSingleFlight(t *testing.T) {
var (
n int32 = 1000
k = \”12344556\”
wg = sync.WaitGroup{}
sf singleflight.Group
failCnt int32 = 0
)
for i := 0; i < int(n); i++ {
go func() {
wg.Add(1)
defer wg.Done()
_, err, _ := sf.Do(k, func() (interface{}, error) {
ctx, _ := context.WithTimeout(context.TODO(), time.Microsecond*30)
go func(_ctx context.Context) {
<-_ctx.Done()
sf.Forget(k)
}(ctx)
return get(k)
})
if err != nil {
atomic.AddInt32(&failCnt, 1)
return
}
}()
}
wg.Wait()
fmt.Printf(\”总请求数=%d,请求成功率=%d,请求失败率=%d\”, n, n-failCnt, failCnt)
}
利用context.WithTimeout()方法控制超时,并且调用Forget()方法移除超时key结果输出如下
===RUNTestSingleFlight总请求数=1000,请求成功率=992,请求失败率=8—PASS:TestSingleFlight(0.00s)PASS
成功率提高了失败率明显降低了。
下面我用DoChan()函数实现
var (
offset int32 = 0
)
func TestSingleFlight(t *testing.T) {
var (
n int32 = 1000 // n 越大,效果越明显
k = \”12344556\”
wg = sync.WaitGroup{}
sf singleflight.Group
successCnt int32 = 0
)
for i := 0; i < int(n); i++ {
go func() {
wg.Add(1)
defer wg.Done()
ch := sf.DoChan(k, func() (interface{}, error) {
return get(k)
})
ctx, _ := context.WithTimeout(context.TODO(), time.Microsecond*100)
select {
case <-ctx.Done():
sf.Forget(k)
return
case ret := <-ch:
if ret.Err != nil {
return
}
atomic.AddInt32(&successCnt, 1)
}
}()
}
wg.Wait()
fmt.Printf(\”总请求数=%d,请求成功率=%d,请求失败率=%d\”, n, successCnt, n-successCnt)
}
func get(key string) (interface{}, error) {
var err error
if atomic.AddInt32(&offset, 1) == 3 { // 假设偏移量 offset == 3 执行耗时长,超时失败了
time.Sleep(time.Microsecond * 400)
err = fmt.Errorf(\”耗时长\”)
}
return key, err
}
大家自行验证
总结
1、singleflight使用得当确实能有效降低下游流量,我也推荐大家使用,但一定要注意同步阻塞问题,防止下游长耗时造成业务异常或高延迟,一定要做好正确性与降低业务下游流量权衡。
2、上面我留了一个问题,singleflight有必要放在server应用和redis之间吗?我认为没必要,redis是内存数据库,响应快,高qps本身不会是瓶颈,保护redis没有意义。另外singleflight用途是防止redis击穿流量打到数据库,如果你业务qps非常高并且对数据实时性要求高,为啥不通过其他手段把数据库数据刷新到redis中?比如数据创建同步写入redis、或通过binlog写入。
到此这篇关于一文教你学会Go中singleflight的使用的文章就介绍到这了,更多相关Go singleflight内容请搜索悠久资源网以前的文章或继续浏览下面的相关文章希望大家以后多多支持悠久资源网!
您可能感兴趣的文章:
- 解决Golang并发工具Singleflight的问题
- Gosingleflight使用以及原理
- Go扩展原语之SingleFlight的用法详解
- Golang使用singleflight解决并发重复请求
- 使用Singleflight实现Golang代码优化
- go singleflight缓存雪崩源码分析与应用
- Go并发原语之SingleFlight请求合并方法实例