条件轮询(Mutex and Cond)

条件轮询(Mutex and Cond)

条件变量 #

sync.Cond是传统的条件变量原语概念在 Go 语言中的实现。我们可以把一个条件变量理解为一个容器,这个容器中存放着一个或一组等待着某个条件成立的 Goroutine。当条件成立后,这些处于等待状态的 Goroutine 将得到通知,并被唤醒继续进行后续的工作。这与百米飞人大战赛场上,各位运动员等待裁判员的发令枪声的情形十分类似。

条件变量是同步原语的一种,如果没有条件变量,开发人员可能需要在 Goroutine 中通过连续轮询的方式,检查某条件是否为真,这种连续轮询非常消耗资源,因为 Goroutine 在这个过程中是处于活动状态的,但它的工作又没有进展。

这里我们先看一个用sync.Mutex 实现对条件轮询等待的例子:

type signal struct{}

var ready bool

func worker(i int) {
  fmt.Printf("worker %d: is working...\n", i)
  time.Sleep(1 * time.Second)
  fmt.Printf("worker %d: works done\n", i)
}

func spawnGroup(f func(i int), num int, mu *sync.Mutex) <-chan signal {
  c := make(chan signal)
  var wg sync.WaitGroup

  for i := 0; i < num; i++ {
    wg.Add(1)
    go func(i int) {
      for {
        mu.Lock()
        if !ready {
          mu.Unlock()
          time.Sleep(100 * time.Millisecond)
          continue
        }
        mu.Unlock()
        fmt.Printf("worker %d: start to work...\n", i)
        f(i)
        wg.Done()
        return
      }
    }(i + 1)
  }

  go func() {
    wg.Wait()
    c <- signal(struct{}{})
  }()
  return c
}

func main() {
  fmt.Println("start a group of workers...")
  mu := &sync.Mutex{}
  c := spawnGroup(worker, 5, mu)

  time.Sleep(5 * time.Second) // 模拟ready前的准备工作
  fmt.Println("the group of workers start to work...")

  mu.Lock()
  ready = true
  mu.Unlock()

  <-c
  fmt.Println("the group of workers work done!")
}

轮询的方式开销大,轮询间隔设置的不同,条件检查的及时性也会受到影响。

sync.Cond为 Goroutine 在这个场景下提供了另一种可选的、资源消耗更小、使用体验更佳的同步方式。使用条件变量原语,我们可以在实现相同目标的同时,避免对条件的轮询。我们用sync.Cond对上面的例子进行改造,改造后的代码如下:

type signal struct{}

var ready bool

func worker(i int) {
  fmt.Printf("worker %d: is working...\n", i)
  time.Sleep(1 * time.Second)
  fmt.Printf("worker %d: works done\n", i)
}

func spawnGroup(f func(i int), num int, groupSignal *sync.Cond) <-chan signal {
  c := make(chan signal)
  var wg sync.WaitGroup

  for i := 0; i < num; i++ {
    wg.Add(1)
    go func(i int) {
      groupSignal.L.Lock()
      for !ready {
        groupSignal.Wait()
      }
      groupSignal.L.Unlock()
      fmt.Printf("worker %d: start to work...\n", i)
      f(i)
      wg.Done()
    }(i + 1)
  }

  go func() {
    wg.Wait()
    c <- signal(struct{}{})
  }()
  return c
}

func main() {
  fmt.Println("start a group of workers...")
  groupSignal := sync.NewCond(&sync.Mutex{})
  c := spawnGroup(worker, 5, groupSignal)

  time.Sleep(5 * time.Second) // 模拟ready前的准备工作
  fmt.Println("the group of workers start to work...")

  groupSignal.L.Lock()
  ready = true
  groupSignal.Broadcast()
  groupSignal.L.Unlock()

  <-c
  fmt.Println("the group of workers work done!")
}

我们运行这个示例程序,得到:

start a group of workers...
the group of workers start to work...
worker 2: start to work...
worker 2: is working...
worker 3: start to work...
worker 3: is working...
worker 1: start to work...
worker 1: is working...
worker 4: start to work...
worker 5: start to work...
worker 5: is working...
worker 4: is working...
worker 4: works done
worker 2: works done
worker 3: works done
worker 1: works done
worker 5: works done
the group of workers work done!

我们看到,sync.Cond实例的初始化,需要一个满足实现了sync.Locker接口的类型实例,通常我们使用sync.Mutex。

条件变量需要这个互斥锁来同步临界区,保护用作条件的数据。加锁后,各个等待条件成立的 Goroutine 判断条件是否成立,如果不成立,则调用sync.Cond的 Wait 方法进入等待状态。Wait 方法在 Goroutine 挂起前会进行 Unlock 操作。

当 main goroutine 将ready置为 true,并调用sync.Cond的 Broadcast 方法后,各个阻塞的 Goroutine 将被唤醒,并从 Wait 方法中返回。Wait 方法返回前, Wait 方法会再次加锁让 Goroutine 进入临界区。接下来 Goroutine 会再次对条件数据进行判定,如果条件成立,就会解锁并进入下一个工作阶段;如果条件依旧不成立,那么会再次进入循环体,并调用 Wait 方法挂起等待。

和sync.Mutex 、sync.RWMutex等相比,sync.Cond 应用的场景更为有限,只有在需要“等待某个条件成立”的场景下,Cond 才有用武之地。

Viewpoint #

From #

34|并发:如何使用共享变量?