前言 这几天的攻关都是在Golang并发上面,也算是有了一些收获,把一些基础的并发场景和并发模式都过了一遍,觉得自己又提升了一层楼,其实我发觉,在我的开发生涯中,总是纠结实现,却不是去看最底层的东西,任何东西,都是由简到难,或者就是从底层到核心,无论是Python还是Golang,或者是我正在狂刷算法的Java,我都是去纠结实现,但是不会钻研核心,所以我写代码都是面向搜素引擎编程。所以,也是时候该改变一些东西。
1 2 3 4 5 决意想要改变 心情就像离弦的箭 收不回的思绪每天在反复缠绵
并发的超时处理 如果我们在开发一个Web框架,势必在运行一个goroutine的时候,会出现运行时间过长的问题,一般这种情况,如果不进行超时控制,到最后的结果就是阻塞在那里,迟迟不返回,这样肯定是不行的,所以我们要对并发进行一个超时限制,说人话就是,加一个timeout之后自动退出或者报错 。
首先这里有个新东西,或者是新包,叫做”Context”,这个我的博客里面的久文章也写过了,在这不多废话。你不会还想让我介绍一下它吧!
Context的几个调用方法 寻思了一下,如果不介绍,很可能你们不知道我在说什么,还是讲几个基础用法吧,其实这个还是比较简单的,Golang的语法比起Rust简单多了。
Context的接口如下
1 2 3 4 5 6 7 8 9 10 type Context interface { Deadline() (deadline time.Time, ok bool ) Done() <-chan struct {} Err() error Value(key interface {}) interface {} }
其中有四个方法
Deadline返回绑定当前context的任务被取消的截止时间;如果没有设定期限,将返回ok == false。
Done 当绑定当前context的任务被取消时,将返回一个关闭的channel;如果当前context不会被取消,将返回nil。
Err 如果Done返回的channel没有关闭,将返回nil;如果Done返回的channel已经关闭,将返回非空的值表示任务结束的原因。如果是context被取消,Err将返回Canceled;如果是context超时,Err将返回DeadlineExceeded。
Value 返回context存储的键值对中当前key对应的值,如果没有对应的key,则返回nil。
简单的超时取消模型 咱们现在来实现一个超时的例子,其实这个直接调用context的timeout逻辑就可以了,有一张图可以明显说清楚超时处理的逻辑
1 ctx, cancel := context.WithTimeout(context.TODO(), time.Second*2 )
看一下context.WithTimeout的源码
1 2 3 func WithTimeout (parent Context, timeout time.Duration) (Context, CancelFunc) { return WithDeadline(parent, time.Now().Add(timeout)) }
这个家伙传入了一个时间,并且调用了withDeadline函数,看下它的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func WithDeadline (parent Context, d time.Time) (Context, CancelFunc) { if cur, ok := parent.Deadline(); ok && cur.Before(d) { return WithCancel(parent) } c := &timerCtx{ cancelCtx: newCancelCtx(parent), deadline: d, } propagateCancel(parent, c) dur := time.Until(d) if dur <= 0 { c.cancel(true , DeadlineExceeded) return c, func () { c.cancel(false , Canceled) } } c.mu.Lock() defer c.mu.Unlock() if c.err == nil { c.timer = time.AfterFunc(dur, func () { c.cancel(true , DeadlineExceeded) }) } return c, func () { c.cancel(true , Canceled) } }
这里输出了两个值,一个是ctx,一个是cancel
你可以这么理解,ctx就是信号,cancel是阀门,ctx的信号结束时,关闭阀门(关闭goroutine)
写个简单的超时退出模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 var ( Timeout = 1 * time.Second starttime = time.Now() signal = make (chan bool ) ) func do_something_work_1 (ctx context.Context, url string ) string { go func () { time.Sleep(time.Second * 10 ) fmt.Println(url) signal <- true }() select { case <-ctx.Done(): fmt.Println("tomeout.........." ) fmt.Printf("%.2fs - workers(%s) killed\n" , time.Since(starttime).Seconds(), url) return "tomeout........" case <-signal: fmt.Println("success" ) return "success" } return "success" } func main () { ctxt, cancel := context.WithTimeout(context.Background(), Timeout) defer cancel() go do_something_work_1(ctxt, "hallo" ) time.Sleep(time.Second * 5 ) fmt.Println("END" ) }
这里的输出结果是
1 2 3 tomeout.......... 1.00s - workers(hallo) killed END
解析一下这个框架吧,这个框架实现了一个简单的超时功能.
首先,我定义了一个假设执行时间很长的task,利用context,我给这个task定义了一个超时时间,并且通过select去监控它,当我监控到ctx有输出的时候,那么我就认为task超时,则直接输出超时并且杀死task,如果task直接run到底,那么会有一个信号signal被赋值,select接受到signal被赋值之后,将会认为任务执行成功,输出成功并且杀死task,大概的逻辑就是这样。
并发的取消和退出 并发的退出,其实又回到了上面的一个例子,刚才我演示了一个叫做cancel() 的东西,这个东西就相当于是杀死goroutine的侩子手。来瞅瞅这个东西到底是啥
首先又回到刚那个逻辑
1 ctx, cancel := context.WithTimeout(context.TODO(), time.Second*2 )
它输出了一个ctx,一个cancel,我们看下cancel到底是个什么东西
在源码里面,它是个
是不是很蒙蔽,这具体是个啥呢,官方文档里面说了
1 2 3 A CancelFunc tells an operation to abandon its work. A CancelFunc does not wait for the work to stop. A CancelFunc may be called by multiple goroutines simultaneously. After the first call, subsequent calls to a CancelFunc do nothing. 大意就是这个参数是放弃操作用的,并不等待工作停止,多个任务可调用多个cancel,第一个任务调用cancel之后,其他调用的操作不起作用。
还是蒙蔽阿,瞅一眼其他调用代码,继续追踪,最后在withdeadline找到了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (c *cancelCtx) cancel (removeFromParent bool , err error) { if err == nil { panic ("context: internal error: missing cancel error" ) } c.mu.Lock() if c.err != nil { c.mu.Unlock() return } c.err = err if c.done == nil { c.done = closedchan } else { close (c.done) } for child := range c.children { child.cancel(false , err) } c.children = nil c.mu.Unlock() if removeFromParent { removeChild(c.Context, c) } }
这部分就相对来说清楚一些了,这里主要是用了一个互斥锁问mutex.lock,看起来是访问共享内存,保护和协调内存访问的,首先先对ctx进行加锁,然后去遍历子goroutine,执行close,没有child之后再进行锁释放,这里的代码真的值得一看。
并发的退出(context代码)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 var c = make (chan int )func do_something_work (ctx context.Context) string { i := 0 for { select { case <-ctx.Done(): fmt.Println("Task Exit" ) return "Task Exit" case c <- i * i: i++ } } return "Task Success" } func main () { ctx, cancel := context.WithCancel(context.Background()) go do_something_work(ctx) for i := 0 ; i < 5 ; i++ { fmt.Println("Next square is" , <-c) } cancel() time.Sleep(time.Second * 3 ) fmt.Println("END" ) }
再来一个其他模式的代码,还有一种是通过chan管道传递,其实实现的逻辑也差不多,这里也贴出来,然后给予讲解
chan类型退出框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func do_somethings (done chan bool , url string ) { go func () { for { select { case <-done: fmt.Println("退出" ) return default : fmt.Println(url + " " + "执行中" ) time.Sleep(1 * time.Second) } } }() } func main () { done := make (chan bool ) do_somethings(done, "hello" ) time.Sleep(3 * time.Second) close (done) }
这里首先定义了一个chan,通过close chan来进行协程的退出,还是老样子,每次一定要有一个select去监控chan,当执行close时,chan会有输出,当获取到chan输出时,则结束杀死任务。
goroutine的心跳模式 有些时候,我们需要一个后台常驻任务,去做一些有的没的,但是有些时候不确定goroutine是否还活着,所以你需要每隔一段时间通知一下,报告情况,虽然存在静默状态,但是会隔固定的时间进行一次通知,这里对并发代码很有用,避免了异常挂住或者zombie的问题。
一般我写心跳代码都是后台常驻的服务,比如监控服务,比如时不时跳出来的告警服务,比如前阵子我写了个网络服务,一直就后台挂着等着别人来访问,有些时候不确定啥时候来人访问,就写了个心跳代码,所以我感觉并发里面,心跳还是很有必要的,特别是分布式的系统。
首先看图,心跳在Golang里怎么表现的
分析下心跳的实现方法
我们建立一个空的channel,设定时间去关闭它
按照时间间隔定时向空的channel发送一个值,类似定时通知机制,并且每次都能读到channel的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 func dowork (done <-chan interface {}, pulseInterval time.Duration) (<-chan interface {}, <-chan time.Time) { heartbeat := make (chan interface {}) results := make (chan time.Time) go func () { defer close (heartbeat) defer close (results) pulse := time.Tick(pulseInterval) workGen := time.Tick(2 * pulseInterval) sendPulse := func () { select { case heartbeat <- struct {}{}: default : } } sendResult := func (r time.Time) { for { select { case <-done: return case <-pulse: sendPulse() case results <- r: return } } } for { select { case <-done: return case <-pulse: sendPulse() case r := <-workGen: sendResult(r) } } }() return heartbeat, results } func main () { done := make (chan interface {}) time.AfterFunc(10 *time.Second, func () { close (done) }) const timeout = 2 * time.Second heartbeat, results := dowork(done, timeout/2 ) for { select { case _, ok := <-heartbeat: if ok == false { return } fmt.Println("pulse" ) case r, ok := <-results: if ok == false { return } fmt.Printf("results %v\n" , r) case <-time.After(timeout): fmt.Println("worker goroutine is not healthy!" ) return } } }
峰值限制(防止过大的并发请求数) 又是抽象的一B的东西,说人话就是,把系统的稳定和平衡性控制在可控范围内。举个简单的例子,如果你是一个动漫网站站长,你每天只允许100个人访问你的网站看动漫,那这个100就是你的限制,就是访问限制,如果第101个人想要看动漫,就只能阻塞在外面继续等待,等100个人中的其中几个人退出才可以继续进去。
这里一般的处理方法是令牌算法
令牌算法的解释如下
1 2 3 4 假设要使用资源,你必须拥有资源的访问令牌。没有令牌,请求会被拒绝。想象这些令牌存储在等待被检索以供使用的桶中。该桶的深度为d,表示它一次可以容纳d个访问令牌。 现在,每次你需要访问资源时,你都会进入存储桶并删除令牌。如果你的存储桶包含五个令牌,那么您可以访问五次资源。在第六次访问时,没有访问令牌可用,那么必须将请求加入队列,直到令牌变为可用,或拒绝请求。
写个代码你看看
1 2 3 4 5 6 7 8 9 10 11 bar24x7 := make (Bar, 10 ) for customerId := 0 ; ; customerId++ { time.Sleep(time.Second) consumer := Consumer{customerId} select { case bar24x7 <- consumer: go bar24x7.ServeConsumer(consumer) default : log.Print("顾客#" , customerId, "不愿等待而离去" ) } }
这个其实相对来说可能比较简单,定义一个队列就可以了,但是这不是最好的办法
速率限制(设定某一时刻的并发请求数) 和上面那个兄弟的机制没啥两样,其实一个是瞬时请求,一个是总请求,说人话就是,上面那个管全局,下面这个管某一时刻,比如在选课的时候,瞬间就有一大堆人过来抢课,这不把服务器撑爆了。所以,速率限制,常用来限制吞吐和确保在一段时间内的资源使用不会超标。
写个代码你看看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 type Request interface {}func handle (r Request) { fmt.Println(r.(int )) }const RateLimitPeriod = time.Minuteconst RateLimit = 10 func handleRequests (requests <-chan Request) { quotas := make (chan time.Time, RateLimit) go func () { tick := time.NewTicker(RateLimitPeriod / RateLimit) defer tick.Stop() for t := range tick.C { select { case quotas <- t: default : } } }() for r := range requests { <-quotas go handle(r) } } func main () { requests := make (chan Request) go handleRequests(requests) for i := 0 ; ; i++ { requests <- i } }
并发最快回应机制 有时候,一份数据可能同时从多个数据源获取。这些数据源将返回相同的数据。因为各种因素,这些数据源的回应速度参差不一,甚至某个特定数据源的多次回应速度之间也可能相差很大。同时从多个数据源获取一份相同的数据可以有效保障低延迟。我们只需采用最快的回应并舍弃其它较慢回应.
写个代码你看看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package mainimport ( "fmt" "time" "math/rand" ) func source (c chan <- int32 ) { ra, rb := rand.Int31(), rand.Intn(3 ) + 1 time.Sleep(time.Duration(rb) * time.Second) c <- ra } func main () { rand.Seed(time.Now().UnixNano()) startTime := time.Now() c := make (chan int32 , 5 ) for i := 0 ; i < cap (c); i++ { go source(c) } rnd := <- c fmt.Println(time.Since(startTime)) fmt.Println(rnd) }
差不多就是这样,这里我偷懒了一下,借用了他人的代码,他的环境是,这篇文章给了我很大的启发,感恩!
通道用例大全
结尾 细细看了一下,写了不少了,但是其中有一两个东西我也没整的太明白,比如goroutine的心跳之类的,这个东西实在是用的不多,所以我感觉我还是,嗯,需要持续性进步才能解决问题,我写的不那么好,如果有错,请老铁们指出来,感恩!
1 2 3 4 5 6 7 8 9 予你开心叫我宝宝 予你异见上我镣铐 小人以为我拿它没办法 光脚不惧穿鞋我又有何怕 check~