ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

007-Golang1.17源码分析之mutex

2022-02-21 20:03:42  阅读:179  来源: 互联网

标签:Golang1.17 old 状态 goroutine state 源码 007 new 唤醒


Golang1.17源码分析之mutex-007

Golang1.17 学习笔记007

源代码:sync/mutex.go

数据结构:

const (

    // 锁标识位(state的最后一位) 
    // Mutex.state & mutexLocked==1表示已经上锁;Mutex.state & mutexLocked==0表示已经未锁
	mutexLocked = 1 << iota // mutex is locked
	
	// 是否存在运行中的协程(state的倒数第二位)
    // Mutex.state & mutexWoken==1表示存在运行中的协程;Mutex.state & mutexWoken==0表示不存在运行中的协程
	mutexWoken
	
	// 饥饿状态位(state的倒数第三位)
    // Mutex.state & mutexStarving==1表示饥饿;Mutex.state & mutexStarving==0表示不饥饿
	mutexStarving
	
	// 等待者数量偏移量(值为3)
    // 根据 mutex.state >> mutexWaiterShift 得到当前阻塞的goroutine数目,最多可以阻塞2^29个goroutine
	mutexWaiterShift = iota
	
	//值为1e6纳秒,也就是1毫秒,当等待队列中队首goroutine等待时间超过starvationThresholdNs也就是1毫秒,mutex进入饥饿模式
	starvationThresholdNs = 1e6
)
type Mutex struct {
    //锁有两种模式:正常模式和饥饿模式
    // 如果一个goroutine获取到了锁之后,它会判断以下两种情况:
	// 1. 它是队列中最后一个goroutine;
	// 2. 它拿到锁所花的时间小于1ms;
	// 以上只要有一个成立,它就会把锁转变回正常模式。
	state int32     //表示互斥锁的状态,比如是否被锁定等。
	sema  uint32    //表示信号量,协程阻塞等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程
}

加锁:

func (m *Mutex) Lock() {
	// 如果 m.state == 0 那么 m.state = 1 && return true
	// 如果当前没有被锁,那么锁上并返回true
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	
	// 已经被其他协程持有,进入自旋阶段
	m.lockSlow()
}

func (m *Mutex) lockSlow() {
	// 用来存当前goroutine等待的时间
	var waitStartTime int64
	// 用来存当前goroutine是否饥饿
	starving := false
	// 用来存当前goroutine是否已唤醒
	awoke := false
	// 用来存当前goroutine的循环次数(想一想一个goroutine如果循环了2147483648次咋办……)
	iter := 0
	// 复制一下当前锁的状态
	old := m.state
	
	// 自旋
	for {
		// 如果是饥饿情况之下,就不要自旋了,因为锁会直接交给队列头部的goroutine
		// 如果锁是被获取状态,并且满足自旋条件(canSpin见后文分析),那么就自旋等锁
		// 伪代码:if isLocked() and isNotStarving() and canSpin()
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识,并标记自己为被唤醒
			// 这样当Unlock的时候就不会去唤醒其它被阻塞的goroutine了
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			// 进行自旋(分析见后文)
			runtime_doSpin()
			iter++
			// 更新锁的状态(有可能在自旋的这段时间之内锁的状态已经被其它goroutine改变)
			old = m.state
			continue
		}
		
		// 当走到这一步的时候,可能会有以下的情况:
		// 1. 锁被获取+饥饿
		// 2. 锁被获取+正常
		// 3. 锁空闲+饥饿
		// 4. 锁空闲+正常
		
		// goroutine的状态可能是唤醒以及非唤醒
		
		// 复制一份当前的状态,目的是根据当前状态设置出期望的状态,存在new里面,
		// 并且通过CAS来比较以及更新锁的状态
		// old用来存锁的当前状态
		new := old
		
		
		// 如果说锁不是饥饿状态,就把期望状态设置为被获取(获取锁)
		// 也就是说,如果是饥饿状态,就不要把期望状态设置为被获取
		// 新到的goroutine乖乖排队去
		// 伪代码:if isNotStarving()
		if old&mutexStarving == 0 {
		    // 伪代码:newState = locked
			new |= mutexLocked
		}
		
		// 如果锁是被获取状态,或者饥饿状态
		// 就把期望状态中的等待队列的等待者数量+1(实际上是new + 8)
		// (会不会可能有三亿个goroutine等待拿锁……)
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		
		// 如果当前goroutine已经处于饥饿状态, 并且old state的已被加锁,
		// 将new state的状态标记为饥饿状态, 将锁转变为饥饿状态.
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		
		// 如果说当前goroutine是被唤醒状态,我们需要reset这个状态
		// 因为goroutine要么是拿到锁了,要么是进入sleep了
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			
			// 这句就是把new设置为非唤醒状态
			// &^的意思是and not
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
		    // 如果说old状态不是饥饿状态也不是被获取状态
			// 那么代表当前goroutine已经通过CAS成功获取了锁
			// (能进入这个代码块表示状态已改变,也就是说状态是从空闲到被获取)
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			
			// If we were already waiting before, queue at the front of the queue.
			queueLifo := waitStartTime != 0
			
			// 如果说之前没有等待过,就初始化设置现在的等待时间
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			
			// 既然获取锁失败了,就使用sleep原语来阻塞当前goroutine
			// 通过信号量来排队获取锁
			// 如果是新来的goroutine,就放到队列尾部
			// 如果是被唤醒的等待锁的goroutine,就放到队列头部
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			
			// 这里sleep完了,被唤醒
			
			// 如果当前goroutine已经是饥饿状态了
			// 或者当前goroutine已经等待了1ms(在上面定义常量)以上
			// 就把当前goroutine的状态设置为饥饿
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			
			// 如果说锁现在是饥饿状态,就代表现在锁是被释放的状态,当前goroutine是被信号量所唤醒的
			// 也就是说,锁被直接交给了当前goroutine
			if old&mutexStarving != 0 {
				
				// 如果说当前锁的状态是被唤醒状态或者被获取状态,或者说等待的队列为空
				// 那么是不可能的,肯定是出问题了,因为当前状态肯定应该有等待的队列,锁也一定是被释放状态且未唤醒
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				
				// 当前goroutine获取锁,waiter数量-1
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				
				// 如果当前goroutine非饥饿状态,或者说当前goroutine是队列中最后一个goroutine
				// 那么就退出饥饿模式,把状态设置为正常
				if !starving || old>>mutexWaiterShift == 1 {
					// 在这里这么做至关重要,还要考虑等待时间。
					// 饥饿模式是非常低效率的,一旦两个goroutine将互斥锁切换为饥饿模式,它们便可以无限锁
					delta -= mutexStarving
				}
				// 原子性地加上改动的状态
				atomic.AddInt32(&m.state, delta)
				break
			}
			// 如果锁不是饥饿模式,就把当前的goroutine设为被唤醒
			// 并且重置iter(重置spin)
			awoke = true
			iter = 0
		} else {
	    	// 如果CAS不成功,也就是说没能成功获得锁,锁被别的goroutine获得了或者锁一直没被释放
			// 那么就更新状态,重新开始循环尝试拿锁
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

判断是否可以自旋:runtime/proc.go: sync_runtime_canSpin

// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
	// 这里的active_spin是个常量,值为4
	// 简单来说,sync.Mutex是有可能被多个goroutine竞争的,所以不应该大量自旋(消耗CPU)
	// 自旋的条件如下:
	// 1. 自旋次数小于active_spin(这里是4)次;
	// 2. 在多核机器上;
	// 3. GOMAXPROCS > 1并且至少有一个其它的处于运行状态的P;
	// 4. 当前P没有其它等待运行的G;
	// 满足以上四个条件才可以进行自旋。
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

解锁:

func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// 这里获取到锁的状态,然后将状态减去被获取的状态(也就是解锁),称为new(期望)状态
	// 注意以上两个操作是原子的,所以不用担心多个goroutine并发的问题
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

func (m *Mutex) unlockSlow(new int32) {
    // 如果说,期望状态加上被获取的状态,不是被获取的话
	// 那么就panic
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	
	// 如果说new状态(也就是锁的状态)不是饥饿状态
	if new&mutexStarving == 0 {
		old := new
		for {
		    // 如果说锁没有等待拿锁的goroutine
			// 或者锁被获取了(在循环的过程中被其它goroutine获取了)
			// 或者锁是被唤醒状态(表示有goroutine被唤醒,不需要再去尝试唤醒其它goroutine)
			// 或者锁是饥饿模式(会直接转交给队列头的goroutine)
			// 那么就直接返回,啥都不用做了
			// 也就是没有等待的goroutine, 或者锁不处于空闲的状态,直接返回.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			/
			// 走到这一步的时候,说明锁目前还是空闲状态,并且没有goroutine被唤醒且队列中有goroutine等待拿锁
			// 将等待的goroutine数减一,并设置woken标识
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			
			// 设置新的state, 这里通过信号量会唤醒一个阻塞的goroutine去获取锁.
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// 饥饿模式下, 直接将锁的拥有权传给等待队列中的第一个.
		// 注意此时state的mutexLocked还没有加锁,唤醒的goroutine会设置它。
		// 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态, mutex还是被认为处于锁状态,
		// 新来的goroutine不会把锁抢过去.
		runtime_Semrelease(&m.sema, true, 1)
	}
}

mutex 模式

  1. normal:协程如果加锁不成功不会立即转入阻塞排队,而是判断是否满足自旋的条件,如果满足则会启动自旋过程,尝试抢锁

  2. starvation:自旋过程中能抢到锁,一定意味着同一时刻有协程释放了锁,我们知道释放锁时如果发现有阻塞等待的协程,
    还会释放一个信号量来唤醒一个等待协程,被唤醒的协程得到CPU后开始运行,此时发现锁已被抢占了,自己只好再次阻塞,
    不过阻塞前会判断自上次阻塞到本次阻塞经过了多长时间,如果超过1ms的话,会将Mutex标记为”饥饿”模式,然后再阻塞。
    处于饥饿模式下,不会启动自旋过程,也即一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取锁,
    同时也会把等待计数减1

Woken状态

Woken状态用于加锁和解锁过程的通信,举个例子,同一时刻,两个协程一个在加锁,一个在解锁,在加锁的协程可能在自旋过程中,
此时把Woken标记为1,用于通知解锁协程不必释放信号量了,好比在说:你只管解锁好了,不必释放信号量,我马上就拿到锁了

参考文献:
《Go专家编程》之mutex
https://www.purewhite.io/2019/03/28/golang-mutex-source/
https://www.cnblogs.com/ricklz/p/14535653.html

标签:Golang1.17,old,状态,goroutine,state,源码,007,new,唤醒
来源: https://blog.csdn.net/weixin_51546892/article/details/123042452

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有