📅 2024年6月20日

📦 使用版本为 1.21.5

在网上找了一个列子

var wait sync.WaitGroup
var count = 0

func main() {
   wait.Add(10)
   for i := 0; i < 10; i++ {
      go func(data *int) {
         // 模拟访问耗时
         time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
         // 访问数据
         temp := *data
         // 模拟计算耗时
         time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
         ans := 1
         // 修改数据
         *data = temp + ans
         fmt.Println(*data)
         wait.Done()
      }(&count)
   }
   wait.Wait()
   fmt.Println("最终结果", count)
}

对于上面的例子,开启了十个协程来对 count进行 +1操作,并且使用了 time.Sleep来模拟不同的耗时,根据直觉来讲,10个协程执行10个 +1操作,最终结果一定是 10,正确结果也确实是 10,但事实并非如此,上面的例子执行结果如下

1
1
1
2
1
1
2
1
2
2
最终结果 2

多次执行后会发现执行结果不会是10,而且会出现多种结果

🍅 由于每个协程访问和计算所需的时间不同,A协程访问数据耗费500毫秒,此时访问到的 count值为1,随后又花费了400毫秒计算,但在这400毫秒内,B协程已经完成了访问和计算并成功更新了 count的值,A协程在计算完毕后,A协程最初访问到的值已经过时了,但A协程并不知道这件事,依旧在原先访问到的值基础上加一,并赋值给 count,这样一来,B协程的执行结果被覆盖了。多个协程读取和访问一个共享数据时,尤其会发生这样的问题,为此就需要用到锁

Go中 sync包下的 MutexRWMutex提供了互斥锁与读写锁两种实现,且提供了非常简单易用的API,加锁只需要 Lock(),解锁也只需要 Unlock()。需要注意的是,Go所提供的锁都是非递归锁,也就是不可重入锁,所以重复加锁或重复解锁都会导致 fatal。锁的意义在于保护不变量,加锁是希望数据不会被其他协程修改,如下

func DoSomething() {
	Lock()
    // 在这个过程中,数据不会被其他协程修改
	Unlock()
}

倘若是递归锁的话,就可能会发生如下情况

func DoSomething() {
	Lock()
    DoOther()
	Unlock()
}

func DoOther() {
	Lock()
	// do other
	Unlock()
}

DoSomthing函数显然不知道 DoOther函数可能会对数据做点什么,从而修改了数据,比如再开几个子协程破坏了不变量。这在Go中是行不通的,一旦加锁以后就必须保证不变量的不变性,此时重复加锁解锁都会导致死锁。所以在编写代码时应该避免上述情况,必要时在加锁的同时立即使用 defer语句解锁

举个例子

var count = 0
var lock sync.Mutex

func main() {
    //已经上锁了
	lock.Lock()
	DoADD(&count)
	lock.Unlock()
	println(count)
}

func DoADD(i *int) {
    //由于锁在main中已经被拿到,这里会一直等待main释放锁,它才可以拿到锁
	lock.Lock()
	defer lock.Unlock()
	*i = *i + 1
}

🌟 互斥锁

sync.Mutex是Go提供的互斥锁实现,其实现了 sync.Locker接口

type Locker interface {
   // 加锁
   Lock()
   // 解锁
   Unlock()
}

解决第一个没有加锁的列子

var wait sync.WaitGroup
var count = 0
var lock sync.Mutex

func main() {
	wait.Add(10)
	for i := 0; i < 10; i++ {
		go func(data *int) {
			//加锁,其他协程需要等待解锁后次啊可以拿取锁
			lock.Lock()
			// 模拟访问耗时
			time.Sleep(time.Second)
			// 访问数据
			temp := *data
			// 模拟计算耗时
			time.Sleep(time.Second)
			ans := 1
			// 修改数据
			*data = temp + ans
			//解锁
			lock.Unlock()
			fmt.Println(*data)
			wait.Done()
		}(&count)
	}
	wait.Wait()
	fmt.Println("最终结果", count)
}

🌟 读写锁

互斥锁适合读操作与写操作频率都差不多的情况,对于一些读多写少的数据,如果使用互斥锁,会造成大量的不必要的协程竞争锁,这会消耗很多的系统资源,这时候就需要用到读写锁,即读写互斥锁,对于一个协程而言

  • 如果获得了读锁,其他协程进行写操作时会阻塞,其他协程进行读操作时不会阻塞
  • 如果获得了写锁,其他协程进行写操作时会阻塞,其他协程进行读操作时会阻塞

Go中读写互斥锁的实现是 sync.RWMutex,它也同样实现了 Locker接口,但它提供了更多可用的方法,如下:

// 加读锁
func (rw *RWMutex) RLock() 

// 尝试加读锁
func (rw *RWMutex) TryRLock() bool 

// 解读锁
func (rw *RWMutex) RUnlock() 

// 加写锁
func (rw *RWMutex) Lock()

// 尝试加写锁
func (rw *RWMutex) TryLock() bool 

// 解写锁
func (rw *RWMutex) Unlock()

代码实列:

读写互斥锁内部实现依旧是互斥锁,并不是说分读锁和写锁就有两个锁,从始至终都只有一个锁。下面来看一个读写互斥锁的使用案例

var wait sync.WaitGroup
var count = 0

var rw sync.RWMutex

func main() {
	wait.Add(12)
	// 读多写少
	go func() {
		for i := 0; i < 3; i++ {
			go Write(&count)
		}
		wait.Done()
	}()
	go func() {
		for i := 0; i < 7; i++ {
			go Read(&count)
		}
		wait.Done()
	}()
	// 等待子协程结束
	wait.Wait()
	fmt.Println("最终结果", count)
}

func Read(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
	//拿到读锁,其他读协程也可以拿到读锁,但是写协程不能拿到写锁
	rw.RLock()
	fmt.Println("拿到读锁")
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	fmt.Println("释放读锁", *i)
	rw.RUnlock()
	wait.Done()
}

func Write(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	//等待读协程完毕之后才可以拿到写锁,并且其他写协程需要等待拿到写锁的协程释放锁才可以拿到锁
	rw.Lock()
	fmt.Println("拿到写锁")
	temp := *i
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	*i = temp + 1
	fmt.Println("释放写锁", *i)
	rw.Unlock()
	wait.Done()
}

其中 TryRlockTryLock两个尝试加锁的操作是非阻塞式的,成功加锁会返回 true,无法获得锁时并不会阻塞而是返回 false,以下代码会发生报错

var wait sync.WaitGroup
var count = 0

var rw sync.RWMutex

func main() {
	wait.Add(12)
	// 读多写少
	go func() {
		for i := 0; i < 3; i++ {
			go Write(&count)
		}
		wait.Done()
	}()
	go func() {
		for i := 0; i < 7; i++ {
			go Read(&count)
		}
		wait.Done()
	}()
	// 等待子协程结束
	wait.Wait()
	fmt.Println("最终结果", count)
}

func Read(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
	// 尝试拿取锁,并不会柱塞其他拿取写锁的协程,当这个锁拿取完毕
	rw.TryRLock()
	fmt.Println("拿到读锁")
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	fmt.Println("释放读锁", *i)
	rw.RUnlock()
	wait.Done()
}

func Write(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	//等待读协程完毕之后才可以拿到写锁,并且其他写协程需要等待拿到写锁的协程释放锁才可以拿到锁
	rw.Lock()
	fmt.Println("拿到写锁")
	temp := *i
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	*i = temp + 1
	fmt.Println("释放写锁", *i)
	rw.Unlock()
	wait.Done()
}

Read 函数中,使用了 TryRLock() 来尝试获取读锁。如果 TryRLock() 返回 false,说明读锁未能立即获取,这意味着在调用 TryRLock() 时,写锁已经被其他 goroutine 持有。如果 TryRLock() 失败,而后续的代码仍然调用了 RUnlock(),就会产生 RUnlock of unlocked RWMutex 的错误。

修改之后

var wait sync.WaitGroup
var count = 0

var rw sync.RWMutex

func main() {
	wait.Add(12)
	// 读多写少
	go func() {
		for i := 0; i < 3; i++ {
			go Write(&count)
		}
		wait.Done()
	}()
	go func() {
		for i := 0; i < 7; i++ {
			go Read(&count)
		}
		wait.Done()
	}()
	// 等待子协程结束
	wait.Wait()
	fmt.Println("最终结果", count)
}

func Read(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
	// 尝试拿取锁,并不会柱塞其他拿取写锁的协程,当这个锁拿取完毕
	lock := rw.TryRLock()
	if lock {
		fmt.Println("拿到读锁")
		time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
		fmt.Println("释放读锁", *i)
		rw.RUnlock()
	}
	wait.Done()
}

func Write(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	//等待读协程完毕之后才可以拿到写锁,并且其他写协程需要等待拿到写锁的协程释放锁才可以拿到锁
	rw.Lock()
	fmt.Println("拿到写锁")
	temp := *i
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	*i = temp + 1
	fmt.Println("释放写锁", *i)
	rw.Unlock()
	wait.Done()
}

提示

对于锁而言,不应该将其作为值传递和存储,应该使用指针。

🌟 条件变量

条件变量,与互斥锁一同出现和使用,所以有些人可能会误称为条件锁,但它并不是锁,是一种通讯机制。Go中的 sync.Cond对此提供了实现,而创建条件变量的函数签名如下:

func NewCond(l Locker) *Cond 

可以看到创建一个条件变量前提就是需要创建一个锁,sync.Cond提供了如下的方法以供使用

// 阻塞等待条件生效,自动调用互斥锁的 Unlock() 方法来释放锁,直到被唤醒,
func (c *Cond) Wait()

// 唤醒一个因条件阻塞的协程
func (c *Cond) Signal()

// 唤醒所有因条件阻塞的协程
func (c *Cond) Broadcast()

这里的关键点在于,条件变量的 Wait() 方法会自动调用互斥锁的 Unlock() 方法来释放锁,然后阻塞当前 goroutine 直到条件满足。当条件满足时,通过调用条件变量的 Broadcast()Signal() 方法唤醒阻塞的 goroutine,这些 goroutine会被重新调度,并自动重新获取之前释放的互斥锁。

下面是一个实列

var wait sync.WaitGroup
var count = 0

var rw sync.RWMutex

// 条件变量,这里使用rw.Rlocker获取当前锁
var cond = sync.NewCond(rw.RLocker())

func main() {
	wait.Add(12)
	// 读多写少
	go func() {
		for i := 0; i < 3; i++ {
			go Write(&count)
		}
		wait.Done()
	}()
	go func() {
		for i := 0; i < 7; i++ {
			go Read(&count)
		}
		wait.Done()
	}()
	// 等待子协程结束
	wait.Wait()
	fmt.Println("最终结果", count)
}

func Read(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
	rw.RLock()
	fmt.Println("拿到读锁")
	// 条件不满足就一直阻塞
	for *i < 3 {
		cond.Wait()
	}
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	fmt.Println("释放读锁", *i)
	rw.RUnlock()
	wait.Done()
}

func Write(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	rw.Lock()
	fmt.Println("拿到写锁")
	temp := *i
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	*i = temp + 1
	fmt.Println("释放写锁", *i)
	rw.Unlock()
	// 唤醒所有因条件变量阻塞的协程
	cond.Broadcast()
	wait.Done()
}

选择使用 RWMutex 的读锁部分 (RWLocker) 作为条件变量的锁,这是因为读锁可以被多个 goroutine 同时持有,这与读多写少的应用场景相匹配。读协程可以同时持有读锁,因此多个读协程可以同时阻塞在条件变量上,等待写协程更新共享资源。

然而,如果直接使用整个 RWMutex 作为条件变量的锁,当一个写协程获取写锁并更新共享资源后,它需要调用 Broadcast()Signal() 来唤醒阻塞的读协程。但问题在于,写协程在调用 Broadcast() 后需要调用 Unlock() 来释放写锁,以便读协程能够重新获取锁并继续执行。如果写协程在调用 Broadcast() 后立即调用 Unlock(),那么它可能会在读协程有机会重新获取锁之前,导致其他写协程获取写锁,从而阻止读协程获取锁并继续执行。这可能导致读协程永远无法获取锁,造成死锁。

因此,使用读锁作为条件变量的锁,可以确保在写协程释放写锁后,读协程可以立即重新获取读锁并继续执行,而不会被其他写协程抢占锁资源。

func (rw *RWMutex) RLocker() Locker {
   return (*rlocker)(rw)
}

type rlocker RWMutex

func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

可以看到 rlocker也只是把读写互斥锁的读锁操作封装了一下,实际上是同一个引用,依旧是同一个锁。读协程读取数据时,如果小于3就会一直阻塞等待,直到数据大于3,而写协程在更新数据后都会尝试唤醒所有因条件变量而阻塞的协程,所以最后的输出如下

拿到读锁
拿到读锁
拿到读锁
拿到读锁
拿到写锁
释放写锁 1
拿到读锁
拿到写锁
释放写锁 2
拿到读锁
拿到读锁
拿到写锁
释放写锁 3 // 第三个写协程执行完毕
释放读锁 3
释放读锁 3
释放读锁 3
释放读锁 3
释放读锁 3
释放读锁 3
释放读锁 3
最终结果 3

从结果中可以看到,当第三个写协程更新完数据后,七个因条件变量而阻塞的读协程都恢复了运行。

提示

对于条件变量,应该使用 for而不是 if,应该使用循环来判断条件是否满足,因为协程被唤醒时并不能保证当前条件就已经满足了。

for !condition {
	cond.Wait()
}

🌟 Sync包

Go中很大一部分的并发相关的工具都是 sync标准库提供的,上述已经介绍过了 sync.WaitGroupsync.Locker等,除此之外,sync包下还有一些其他的工具可以使用

🍅 Once

当在使用一些数据结构时,如果这些数据结构太过庞大,可以考虑采用懒加载的方式,即真正要用到它的时候才会初始化该数据结构

type MySlice []string

func (m *MySlice) Get(i int) (string, bool) {
    // 判断切片内是否有值
	if len(*m) > 0 {
		return "", false
	}
	// (*m)[i] 先获取指针,再获取值
	return (*m)[i], true
}
func (m *MySlice) Add(i string) {
	// 当真正用到切片的时候,才会考虑去初始化
	if len(*m) > 0 {
		*m = make([]string, 0, 10)
	}
	*m = append(*m, i)
}

那么问题就来了,如果只有一个协程使用肯定是没有任何问题的,但是如果有多个协程访问的话就可能会出现问题了。比如协程A和B同时调用了 Add方法,A执行的稍微快一些,已经初始化完毕了,并且将数据成功添加,随后协程B又初始化了一遍,这样一来将协程A添加的数据直接覆盖掉了,这就是问题所在。

下面这个列子演示了这个问题

func main() {
	var wg sync.WaitGroup
	slice := make(MySlice, 0, 10)
	wg.Add(4)
	go func(slice *MySlice) {
		slice.Add("one")
		wg.Done()
	}(&slice)
	go func(slice *MySlice) {
		slice.Add("two")
		wg.Done()
	}(&slice)
	go func(slice *MySlice) {
		slice.Add("three")
		wg.Done()
	}(&slice)
	go func(slice *MySlice) {
		slice.Add("four")
		wg.Done()
	}(&slice)
	wg.Wait()
	fmt.Println(slice)
}

输出结果

[two]

这个问题可以使用 sync.Once解决,sync.Once保证了在并发条件下指定操作只会执行一次。它的使用非常简单,只对外暴露了一个 Do方法,签名如下:

func (o *Once) Do(f func()) 

修改之前代码使用 Once

type MySlice struct {
	s []string
	o sync.Once
}

func (m *MySlice) Get(i int) (string, bool) {
	if len(m.s) > 0 {
		return "", false
	}
	// (*m)[i] 先获取指针,再获取值
	return m.s[i], true
}
func (m *MySlice) Add(i string) {
	// 当真正用到切片的时候,才会考虑去初始化,使用Once.Do表示只执行一次
	m.o.Do(func() {
		fmt.Println("初始化")
		if len(m.s) > 0 {
			m.s = make([]string, 0, 10)
		}
	})
	m.s = append(m.s, i)
}

func main() {
	var wg sync.WaitGroup
	var slice MySlice
	wg.Add(4)
	go func(slice *MySlice) {
		slice.Add("one")
		wg.Done()
	}(&slice)
	go func(slice *MySlice) {
		slice.Add("two")
		wg.Done()
	}(&slice)
	go func(slice *MySlice) {
		slice.Add("three")
		wg.Done()
	}(&slice)
	go func(slice *MySlice) {
		slice.Add("four")
		wg.Done()
	}(&slice)
	wg.Wait()
	fmt.Println(len(slice.s))
}

输出结果

初始化
4

从输出结果中可以看到,所有的数据等正常添加进切片,初始化操作只执行了一次。其实 sync.Once的实现相当简单,去除注释真正的代码逻辑只有16行,其原理就是锁+原子操作。源代码如下:

type Once struct {
    // 用于判断操作是否已经执行
	done uint32
	m    Mutex
}

func (o *Once) Do(f func()) {
	// 原子加载数据
	if atomic.LoadUint32(&o.done) == 0 {
		o.doSlow(f)
	}
}

func (o *Once) doSlow(f func()) {
    // 加锁
	o.m.Lock()
    // 解锁
	defer o.m.Unlock()
    // 判断是否执行
	if o.done == 0 {
        // 执行完毕后修改done
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

🍅 Pool

sync.Pool的设计目的是用于存储临时对象以便后续的复用,是一个临时的并发安全对象池,将暂时用不到的对象放入池中,在后续使用中就不需要再额外的创建对象可以直接复用,减少内存的分配与释放频率,最重要的一点就是降低GC压力。sync.Pool总共只有两个方法,如下:

// 申请一个对象
func (p *Pool) Get() any

// 放入一个对象
func (p *Pool) Put(x any)

并且 sync.Pool有一个对外暴露的 New字段,用于对象池在申请不到对象时初始化一个对象

New func() any

sync.PoolGet 方法并不会将存储在池中的所有实例放回。相反,Get 方法的作用是从池中取出一个空闲的对象实例供调用者使用。如果池中有可用的实例,Get 将返回池中的一个实例;如果没有可用实例,Get 会调用之前设置的 New 函数来创建一个新的实例。

先来看一段代码示例

var wait sync.WaitGroup

// 临时对象池
var pool sync.Pool

// 用于计数过程中总共创建了多少个对象
var numOfObject atomic.Int64

// BigMemData 假设这是一个占用内存很大的结构体
type BigMemData struct {
   M string
}

func main() {
    // 如果Get不到就是被GC回收了,重新使用New创建
   pool.New = func() any {
      numOfObject.Add(1)
      return BigMemData{"大内存"}
   }
   wait.Add(1000)
   // 这里开启1000个协程
   for i := 0; i < 1000; i++ {
      go func() {
         // 申请对象
         val := pool.Get()
         // 使用对象
         _ = val.(BigMemData)
         // 用完之后放入池中
         pool.Put(val)
         wait.Done()
      }()
   }
   wait.Wait()
   fmt.Println(numOfObject.Load())
}


// 执行后代码
4

也就说在1000个协程中这个对象只被创建了4次,很大的降低的GC的压力

有人会有疑惑我在 main中创建好对象直接传递给协程用可以吗?那样不是只创建了一次和销毁一次吗?比如下面这样

func main() {
	data := BigMemData{"大内存"}
	var wg sync.WaitGroup
	wg.Add(1000)
	for i := 0; i < 1000; i++ {
		go func(d BigMemData) {
			_ = d
		}(data)
	}
	wg.Wait()
}

看似是这样,但是其实由于协程是独立的栈空间,每次循环都会启动一个新的 goroutine,并且在该 goroutine 的栈上创建一个 BigMemData 的副本 d。即使 d 被初始化为 data 的值,它仍然是一个独立的实例

那如果传递的是指针呢?

func main() {
	data := BigMemData{"大内存"}
	var wg sync.WaitGroup
	wg.Add(1000)
	for i := 0; i < 1000; i++ {
		go func(d *BigMemData) {
			_ = d
		}(&data)
	}
	wg.Wait()
}

确实这样只创建了一次,在协程中并不会在创建,但是你要对这个数据进行锁和生命周期管理,否则就会发生数据竞争等问题,如果是直接使用 pool它提供了对象的按需创建和安全重用机制,sync.Pool 会管理对象的生命周期,确保每个 goroutine 在需要时都能获取到一个可用的对象实例,而不需要担心数据竞争或生命周期管理问题。

如果不采用对象池,那么1000个协程都需要各自实例化对象,并且这1000个实例化后的对象在使用完毕后都需要由GC来释放内存,如果有几十万个协程或者说创建该对象的成本十分的高昂,这种情况下就会占用很大的内存并且给GC带来非常大的压力。

采用对象池后,可以复用对象减少实例化的频率,如果不采用对象池的话1000个协程将会创建1000个对象,这种优化带来的提升是显而易见的,尤其是在并发量特别大和实例化对象成本特别高的时候更能体现出优势

⚠️ 注意点:

  • 临时对象:sync.Pool只适合存放临时对象,池中的对象可能会在没有任何通知的情况下被GC移除,所以并不建议将网络链接,数据库连接这类存入 sync.Pool中。
  • 不可预知:sync.Pool在申请对象时,无法预知这个对象是新创建的还是复用的,也无法知晓池中有几个对象
  • 并发安全:官方保证 sync.Pool一定是并发安全,但并不保证用于创建对象的 New函数就一定是并发安全的,New函数是由使用者传入的,所以 New函数的并发安全性要由使用者自己来维护,这也是为什么上例中对象计数要用到原子值的原因。

提示

最后需要注意的是,当使用完对象后,一定要释放回池中,如果用了不释放那么对象池的使用将毫无意义。

🍅 Map

sync.Map是官方提供的一种并发安全Map的实现,它在并发读写时不会触发 panic

⭐️ 和原生 map比较:sync.Map是经过特定场所进行优化的,在读多写少的情况下,读基本上使用原子操作,没有对锁的竞争所以性能相对高,但是在写的情况下,由于内部结构实现和锁的竞争性能会低一些

开箱即用,使用起来十分的简单,

下面是该结构体对外暴露的方法:

// 根据一个key读取值,返回值会返回对应的值和该值是否存在
func (m *Map) Load(key any) (value any, ok bool)

// 存储一个键值对
func (m *Map) Store(key, value any)

// 删除一个键值对
func (m *Map) Delete(key any)

// 如果该key已存在,就返回原有的值,否则将新的值存入并返回,当成功读取到值时,loaded为true,否则为false(避免覆盖)
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool)

// 删除一个键值对,并返回其原有的值,loaded的值取决于key是否存在
func (m *Map) LoadAndDelete(key any) (value any, loaded bool)

// 遍历Map,当f()返回false时,就会停止遍历
func (m *Map) Range(f func(key, value any) bool) 

简单使用

func main() {
	var syncMap sync.Map
	// 存储数据
	syncMap.Store("one", 1)
	// 读取数据
	fmt.Println(syncMap.Load("one"))
	// 存在则读取不存在则添加
	fmt.Println(syncMap.LoadOrStore("one", 1))
	fmt.Println(syncMap.LoadOrStore("two", 2))
	fmt.Println(syncMap.LoadOrStore("three", 3))
	// 读取并删除
	fmt.Println(syncMap.LoadAndDelete("three"))
	// 遍历map
	syncMap.Range(func(key, value any) bool {
		fmt.Println(key, value)
		return true
	})
}



// 输出
1 true
1 true 
2 false
3 false
3 true 
one 1  
two 2 

在并发场景中如果使用简单的 map

func main() {
	myMap := make(map[int]int, 10)
	var wg sync.WaitGroup
	wg.Add(10)
	for i := 0; i < 10; i++ {
		go func(n int) {
			for i := 0; i < 1000; i++ {
				myMap[n] = n
			}
			wg.Done()
		}(i)
	}
	wg.Wait()
	fmt.Println(myMap)
}

这极大程度会引发并发报错 fatal error: concurrent map writes,因为会发送数据竞争和未定义行为

可以通过加锁或者 sync.Map来直接解决

func main() {
	var syncMap sync.Map
	var wg sync.WaitGroup
	wg.Add(10)
	for i := 0; i < 10; i++ {
		go func(n int) {
			for i := 0; i < 1000; i++ {
				syncMap.Store(n, n)
			}
			wg.Done()
		}(i)
	}
	wg.Wait()
	syncMap.Range(func(key, value any) bool {
		fmt.Println(key, value)
		return true
	})
}

为了并发安全肯定需要做出一定的牺牲,sync.Map的性能要比map低10-100倍左右

⚠️ 注意点:

  1. sync.Map并不可以替代掉原生的 map,它只是在特定场景下进行了优化

  2. 它内部是由两个 map组成,分别是 read、write Map 因此增加了 GC压力

  3. 类型安全风险,在 sync.map存储了两个键值对的类型都不一样的数据,在使用 range的时候如果需要类型断言,则会发生panic,因为它将数据以 interface{}的类型来存储的,读出数据也是 空结接口类型

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    var m sync.Map
    func main() {
       m.Store("Kevin",90)
       m.Store(80,"Kevin")
    
       m.Range(func(key, value any) bool {
    	fmt.Printf("%s,%d\n",key.(string),value.(int))
    	return true
       })
    }
    
  4. 不可以传递和拷贝,因为锁是不可以拷贝的,sync.Map 使用了锁

🌟 原子

在计算机学科中,原子或原语操作,通常用于表述一些不可再细化分割的操作,由于这些操作无法再细化为更小的步骤,在执行完毕前,不会被其他的任何协程打断,所以执行结果要么成功要么失败,没有第三种情况可言,如果出现了其他情况,那么它就是不是原子操作。

🍅 原子操作的类型

好在大多情况下并不需要自行编写汇编,Go标准库 sync/atomic包下已经提供了原子操作相关的API,其提供了以下几种类型以供进行原子操作。

atomic.Bool{}
atomic.Pointer[]{}
atomic.Int32{}
atomic.Int64{}
atomic.Uint32{}
atomic.Uint64{}
atomic.Uintptr{}
atomic.Value{}

其中 Pointer原子类型支持泛型,Value类型支持存储任何类型,除此之外,还提供了许多函数来方便操作。因为原子操作的粒度过细,在大多数情况下,更适合处理这些基础的数据类型。

提示

atmoic包下原子操作只有函数签名,没有具体实现,具体的实现是由 plan9汇编编写

🍅 使用

每一个原子类型都会提供以下三个方法:

  • Load():原子的获取值
  • Swap(newVal type) (old type):原子的交换值,并且返回旧值
  • Store(val type):原子的存储值

不同的类型可能还会有其他的额外方法,比如整型类型都会提供 Add方法来实现原子加减操作。下面以一个 int64类型演示为例:

func main() {
	var aint64 atomic.Uint64
	// 存储值
	aint64.Store(64)
	// 交换值
	aint64.Swap(128)
	// 增加
	aint64.Add(112)
    // 加载值
	fmt.Println(aint64.Load())
}

或者也可以直接使用函数

func main() {
   var aint64 int64
   // 存储值
   atomic.StoreInt64(&aint64, 64)
   // 交换值
   atomic.SwapInt64(&aint64, 128)
   // 增加
   atomic.AddInt64(&aint64, 112)
   // 加载
   fmt.Println(atomic.LoadInt64(&aint64))
}

其他的类型的使用也是十分类似的,最终输出为:

240

🍅 CAS

atmoic包还提供了 CompareAndSwap操作,也就是 CAS,它是乐观锁的一种典型实现。乐观锁本身并不是锁,是一种并发条件下无锁化并发控制方式。之所以被称作乐观锁,是因为它总是乐观的假设共享数据不会被修改,仅当发现数据未被修改时才会去执行对应操作,而前面了解到的互斥量就是悲观锁,互斥量总是悲观的认为共享数据肯定会被修改,所以在操作时会加锁,操作完毕后就会解锁。由于无锁化实现的并发安全效率相对于锁要高一些,许多并发安全的数据结构都采用了 cAS来进行实现,不过真正的效率要结合具体使用场景来看。看下面的一个例子:

var lock sync.Mutex

var count int

func Add(num int) {
   lock.Lock()
   count += num
   lock.Unlock()
}

这是一个使用互斥锁的例子,每次增加数字前都会先上锁,执行完毕后就会解锁,过程中会导致其他协程阻塞,接下来使用 CAS改造一下:

var count int64

func Add(num int64) {
	for {
		expect := atomic.LoadInt64(&count)
		if atomic.CompareAndSwapInt64(&count, expect, expect+num) {
			break
		}
	}
}

对于 CAS而言,有三个参数,内存值,期望值,新值。执行时,CAS会将期望值与当前内存值进行比较,如果内存值与期望值相同,就会执行后续的操作,否则的话什么也不做。对于Go中 atomic包下的原子操作,CAS相关的函数则需要传入地址,期望值,新值,且会返回是否成功替换的布尔值。例如 int64类型的 CAS 操作函数签名如下:

func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

CAS的例子中,首先会通过 LoadInt64来获取期望值,随后使用 CompareAndSwapInt64来进行比较交换,如果不成功的话就不断循环,直到成功。这样无锁化的操作虽然不会导致协程阻塞,但是不断的循环对于CPU而言依旧是一个不小的开销,所以在一些实现中失败达到了一定次数可能会放弃操作。但是对于上面的操作而言,仅仅只是简单的数字相加,涉及到的操作并不复杂,所以完全可以考虑无锁化实现。

提示

大多数情况下,仅仅只是比较值是无法做到并发安全的,比如因 CAS引起ABA问题,就需要使用额外加入 version来解决问题。

🍅 Value

atomic.Value结构体,可以存储任意类型的值,结构体如下

type Value struct {
   // any类型
   v any
}

尽管可以存储任意类型,但是它不能存储 nil,并且前后存储的值类型应当一致,下面两个例子都无法通过编译

func main() {
   var val atomic.Value
   val.Store(nil)
   fmt.Println(val.Load())
}
// panic: sync/atomic: store of nil value into Value   
func main() {
   var val atomic.Value
   val.Store("hello world")
   val.Store(114154)
   fmt.Println(val.Load())
}
// panic: sync/atomic: store of inconsistently typed value into Value 

除此之外,它的使用与其他的原子类型并无太大的差别,并且需要注意的是,所有的原子类型都不应该复制值,而是应该使用它们的指针。