type gobuf struct { // 保存CPU的rsp的寄存器值 sp uintptr // 保存CPU的rip寄存器值 pc uintptr // 记录gobuf 属于哪个 协程goroutine g guintptr ctxt unsafe.Pointer // 保存调用的返回值 ret uintptr lr uintptr // 保存rbp寄存器的值 bp uintptr// for framepointer-enabled architectures }
// 检查goroutine是否需要唤醒 tryWakeP := false if trace.enabled || trace.shutdown { gp = traceReader() if gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) traceGoUnpark(gp, 0) tryWakeP = true } } if gp == nil && gcBlackenEnabled != 0 { gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) if gp != nil { tryWakeP = true } } if gp == nil { // 检查全局队列,每隔一段时间 // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) // We can see gp != nil here even if the M is spinning, // if checkTimers added a local goroutine via goready. } if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available } }
在这个函数里面,首先会检测程序是否处于垃圾回收阶段,然后再检测是否需要标记协程
接下来是几个重点概念
Go会使用队列,将等待执行的协程存放在其中
Go的协程队列分为局部运行队列和全局运行队列
Go的运行队列是一个先入先出的队列
来看一下Go调度器P的源码
局部队列源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
type p struct { id int32 status uint32// one of pidle/prunning/... .....
if gp == nil { // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. // 如果可以被61整除并且全局队列的数量大于0 if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) // 获取一个g gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } }
// Try get a batch of G's from the global runnable queue. // sched.lock must be held. funcglobrunqget(_p_ *p, max int32) *g { assertLockHeld(&sched.lock)
if sched.runqsize == 0 { returnnil }
// 平均分配 // n就是每个p获取的g的数量 n := sched.runqsize/gomaxprocs + 1 if n > sched.runqsize { n = sched.runqsize } if max > 0 && n > max { n = max } if n > int32(len(_p_.runq))/2 { n = int32(len(_p_.runq)) / 2 }
// 全局队列 - n sched.runqsize -= n
// gp = 队伍的最后一个 gp := sched.runq.pop() // 递减 n-- // 如果 n > 0 for ; n > 0; n-- { gp1 := sched.runq.pop() // 上传 runqput(_p_, gp1, false) } return gp }
funcrunqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool)uint32 { for { h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer n := t - h // 偷取一半的G个数 n = n - n/2 ..... // 放入自己的队列 for i := uint32(0); i < n; i++ { g := _p_.runq[(h+i)%uint32(len(_p_.runq))] batch[(batchHead+i)%uint32(len(batch))] = g } if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume return n } } }
funcretake(now int64)uint32 { n := 0 lock(&allpLock) // 重新获取allp for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { // This can happen if procresize has grown // allp but not yet created new Ps. continue } pd := &_p_.sysmontick s := _p_.status sysretake := false if s == _Prunning || s == _Psyscall { // Preempt G if it's running for too long. t := int64(_p_.schedtick) ifint64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now } elseif pd.schedwhen+forcePreemptNS <= now { preemptone(_p_) // 系统调用 没有连接到P的M sysretake = true } } if s == _Psyscall { // 如果存在1个sysmon,从系统调用中获取P t := int64(_p_.syscalltick) if !sysretake && int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // 防止进入深度睡眠无法huan'x if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } unlock(&allpLock) // 较少空闲锁定M的数量 // 增加nmidle报告死锁 incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { if trace.enabled { traceGoSysBlock(_p_) traceProcStop(_p_) } n++ _p_.syscalltick++ handoffp(_p_) } incidlelocked(1) lock(&allpLock) } } unlock(&allpLock) returnuint32(n) }