ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Golang sync.WaitGroup 源码分析

2021-11-27 03:00:07  阅读:186  来源: 互联网

标签:wg WaitGroup race unsafe Golang state 源码 Pointer


结构

// WaitGroup类型的数据不可以被复制
type WaitGroup struct {
	noCopy noCopy	// 用来禁止当前结构的类型复制

    // state1 是 64-bit变量:
    //   高32位是计数器counter,也就是活跃的g的个数
    //   低32位表示因执行Wait()而阻塞的g的数量,即waiters
    // state2 表示sema信号量,说明本章代码用到了原语
    // 64-bit的原子操作需要64-bit的对齐,但是32位的编译器只能保证64-bit字段是32位对齐
    // 因此,在32位架构上,我们需要在state()中检查state1是否对齐,
    // 并在需要时动态地 "交换 "字段顺序。
	state1 uint64
	state2 uint32
}

Add

Add

func (wg *WaitGroup) Add(delta int) {
    // 获取state1 和 state2
	statep, semap := wg.state()
    // 竞争检测代码不看
	if race.Enabled {
		_ = *statep // trigger nil deref early
		if delta < 0 {
			// Synchronize decrements with Wait.
			race.ReleaseMerge(unsafe.Pointer(wg))
		}
		race.Disable()
		defer race.Enable()
	}
    // counter加delta
	state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)	// 获取当前活跃的g的数量
    w := uint32(state)		// 获取当前Wait()的次数
    // 竞争检测代码,不看
	if race.Enabled && delta > 0 && v == int32(delta) {
		// The first increment must be synchronized with Wait.
		// Need to model this as a read, because there can be
		// several concurrent wg.counter transitions from 0.
		race.Read(unsafe.Pointer(semap))
	}
    // 活跃的g个数不能是负数个,有可能delta传的是负数
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
    // 说明先调用的Wait()再调用Add(),正常应该反过来
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
    // Add()执行成功返回
	if v > 0 || w == 0 {
		return
	}
	// 到这一步说明 counter == 0 且 waiters > 0.
	// 如果*statep != state,有可能发生了两种错误情况
    // - Add()和Wait()并发(concurrently)调用
	// - 当counter归零时,waiters数量还在增加
	// 继续检查,保证WaitGroup不被滥用
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// 如果WaitGroup使用规范,到这一步counter为0说明无活跃g了
    // 将state1置0,同时释放所有的waiters
	*statep = 0
	for ; w != 0; w-- {
		runtime_Semrelease(semap, false, 0)
	}
}

state

// 返回存state1和state2,即state和sema,
// 因为32位编译器不能直接对齐64位数据,需要这个函数做对齐工作
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		//如果state1是64-bit则直接原样返回
		return &wg.state1, &wg.state2
	} else {
		// 如果state1是32-bit对齐而不是64-bit对齐,
		// 那么(&state1)+4 就是 64-bit 对齐了
		state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
		return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
	}
}
  • unsafe.Alignof(T Type) 返回对齐方式,下表是32位系统和64位系统各大小数据的对齐方式值

    32位系统:

    Sizeof(x)   = 16  Alignof(x)   = 4
    Sizeof(x.a) = 1   Alignof(x.a) = 1 Offsetof(x.a) = 0
    Sizeof(x.b) = 2   Alignof(x.b) = 2 Offsetof(x.b) = 2
    Sizeof(x.c) = 12  Alignof(x.c) = 4 Offsetof(x.c) = 4
    

    64位系统:

    Sizeof(x)   = 32  Alignof(x)   = 8
    Sizeof(x.a) = 1   Alignof(x.a) = 1 Offsetof(x.a) = 0
    Sizeof(x.b) = 2   Alignof(x.b) = 2 Offsetof(x.b) = 2
    Sizeof(x.c) = 24  Alignof(x.c) = 8 Offsetof(x.c) = 8
    
  • unsafe.Pointer(&T) 是一个可以包含任意类型变量的地址的通用指针,所以unsafe.Pointer()常用于各种指针相互转换的桥梁。有四个特有的操作:

    1. 任何类型的指针都可以被转化为Pointer
    2. Pointer可以被转化为任何类型的指针
    3. uintptr可以被转化为Pointer
    4. Pointer可以被转化为uintptr

    不能直接通过 *p 方式来取得真实的变量值,因为不知道变量的具体类型。可以通过 uintptr(unsafe.Pointer(&T)) 来进行运算。

    unsafe.Pointer是可以比较的,并且支持和nil常量比较判断是否为空指针。

  • uintptr() 主要用来进行指针计算,本质是一个整型。一般用 uintptr(unsafe.Pointer(&T))来进行运算(T的类型未知)。

    注意:首先GC不认为uintptr是一个活引用,因此uintptr指向的对象可能被gc回收。其次,如果uintptr关联的对象移动,则其值也不会更新,即uintptr无法保持对变量的引用。

Done

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

Wait

Wait

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
    // 获取state1和state2的地址
	statep, semap := wg.state()
	if race.Enabled {
		_ = *statep // trigger nil deref early
		race.Disable()
	}
	for {
        // 原语:原子操作获取state1的值
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32)	// counter,活跃的g的数量
		w := uint32(state)		// waiters
        // 如果没有活跃的g了,直接返回
		if v == 0 {
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
		// CAS操作增加waiters的数量
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
            // 竞争检测代码,不看
			if race.Enabled && w == 0 {
				// Wait must be synchronized with the first Add.
				// Need to model this is as a write to race with the read in Add.
				// As a consequence, can do the write only for the first waiter,
				// otherwise concurrent Waits will race with each other.
				race.Write(unsafe.Pointer(semap))
			}
            // 到这一步,说明当前g要阻塞等待了
            // 原语:根据semap的值也就是state2的地址找到相应的阻塞队列,把当前g放进去,并挂起
			runtime_Semacquire(semap)
            
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
	}
}

标签:wg,WaitGroup,race,unsafe,Golang,state,源码,Pointer
来源: https://www.cnblogs.com/usmiles/p/15610521.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有