海量编程文章、技术教程与实战案例

网站首页 > 技术文章 正文

go语言并发原语RWMutex实现原理及闭坑指南

yimeika 2025-05-15 22:56:41 技术文章 7 ℃

1.RWMutex常用方法

  • Lock/Unlock
  • RLock/RUnlock
  • RLocker 为读操作返回一个Locker接 口的对象

2. RWMutex使用方法

 func main() {
 var counter Counter
 for i := 0; i < 10; i++ { // 10个reader
 go func() {
 for {
 counter.Count() // 计数器读操作
 time.Sleep(time.Millisecond)
 }
 }()
 }
 for { // 一个writer
 counter.Incr() // 计数器写操作
 time.Sleep(time.Second)
 }
 }
 // 一个线程安全的计数器
 type Counter struct {
 mu sync.RWMutex
 count uint64
 }
 // 使用写锁保护
 func (c *Counter) Incr() {
 c.mu.Lock()
 c.count++
 c.mu.Unlock()
 }
 / 使用读锁保护
 func (c *Counter) Count() uint64 {
 c.mu.RLock()
 defer c.mu.RUnlock()
 return c.count
 }

在读多写少的场景下,使用RWMutex性能要更好些

3.实现原理

RWMutex是基于Mutex实现的,以写优先来实现的

 type RWMutex struct {
     w Mutex // 互斥锁解决多个writer的竞争
     writerSem uint32 // writer信号量
     readerSem uint32 // reader信号量
     readerCount int32 // reader的数量
     readerWait int32 // writer等待完成的reader的数量(记录写锁来的时候前面还需要等待多少个读锁,其实也是在某一时刻 readerCount 的复制值)
 }
 const rwmutexMaxReaders = 1 << 30

说明:

  • 这里的常量 rwmutexMaxReaders,定义了最大的 reader 数量。
  • 字段 w:为 writer 的竞争锁而设计;
  • 字段 readerCount:记录当前 reader 的数量(以及是否有 writer 竞争锁);
  • readerWait:记录 writer 请求锁时需要等待 read 完成的 reader 的数量;
  • writerSem 和 readerSem:都是为了阻塞设计的信号量。

3.1 RLock/RUnlock的实现

 func (rw *RWMutex) RLock() {
     
      // 每次goroutine获得读锁,readerCount+1
     // 这里分两种情况:
     // 1. 当判断大于等于0, 证明当前没有写锁, 那么可以上读锁, 并且readerCount原子加1(读锁可重入[只要匹配了释放次数就行])
     // 2. 当判断小于0, 证明当前有写锁(Lock时会readerCount-rwmutexMaxReaders, 因此会小于0), 所以通过readerSem读信号量, 使读操作睡眠等待
 if atomic.AddInt32(&rw.readerCount, 1) < 0 {
 // rw.readerCount是负值的时候,意味着此时有writer等待请求锁,因为writer优先
     runtime_SemacquireMutex(&rw.readerSem, false, 0)
     }
 }
 func (rw *RWMutex) RUnlock() {
     
     // 这里分两种情况:
     // 释放读锁, readerCount减1
     // 1.若readerCount大于0, 证明当前还有读锁, 直接结束本次操作
     // 2.若readerCount小于等于0, 证明已经没有读锁, 可以唤醒写锁(若有)
 if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
     rw.rUnlockSlow(r) // 有等待的writer
     }
 }
 func (rw *RWMutex) rUnlockSlow(r int32) {
     // 1. 本来就没读锁, 调用RUnlock就发生panic
     // 2. 超过读锁的最大限制就发生panic
     if r+1 == 0 || r+1 == -rwmutexMaxReaders {
         race.Enable()
         throw("sync: RUnlock of unlocked RWMutex")
     }
     // readerWait--操作,如果readerWait--操作之后的值为0,说明,写锁之前,已经没有读锁了
     // 通过writerSem信号量,唤醒队列中第一个阻塞的写锁
 if atomic.AddInt32(&rw.readerWait, -1) == 0 {
 // 最后一个reader了,writer终于有机会获得锁了
 runtime_Semrelease(&rw.writerSem, false, 1)
 }

3.2 Lock的实现

 func (rw *RWMutex) Lock() {
 // 首先解决其他writer竞争问题
 rw.w.Lock()
  //// 先readerCount-rwmutexMaxReaders<0,标识有写锁来了,让后续来的读锁堵塞无法拿到锁
     // 再加回rwmutexMaxReaders得到当前读锁的个数
 // 反转readerCount,告诉reader有writer竞争锁
 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxRead
 // 如果当前有reader持有锁,那么需要等待
 //// 读锁个数不为0的时候,写锁阻塞,把当前的读锁个数readerCount赋值给readerWait
     // 用于标识写锁前面还需要等待多少个读锁
 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
 runtime_SemacquireMutex(&rw.writerSem, false, 0)
 }
 }

3.3 Unlock的实现

 func (rw *RWMutex) Unlock() {
 // 告诉reader没有活跃的writer了
 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
 // 唤醒阻塞的reader们
 for i := 0; i < int(r); i++ {
 runtime
 _
 Semrelease(&rw.readerSem, false, 0)
 }
 // 释放内部的互斥锁
 rw.w.Unlock()
 }

4 使用RWMutex的注意点

  • 不可复制
  • 重入导致死锁
  • 释放未加锁的RWMutex,会产生panic

Tags:

最近发表
标签列表