src/pkg/sync/waitgroup.go - The Go Programming Language

Golang

Source file src/pkg/sync/waitgroup.go

     1	// Copyright 2011 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	package sync
     6	
     7	import "sync/atomic"
     8	
     9	// A WaitGroup waits for a collection of goroutines to finish.
    10	// The main goroutine calls Add to set the number of
    11	// goroutines to wait for.  Then each of the goroutines
    12	// runs and calls Done when finished.  At the same time,
    13	// Wait can be used to block until all goroutines have finished.
    14	type WaitGroup struct {
    15		m       Mutex
    16		counter int32
    17		waiters int32
    18		sema    *uint32
    19	}
    20	
    21	// WaitGroup creates a new semaphore each time the old semaphore
    22	// is released. This is to avoid the following race:
    23	//
    24	// G1: Add(1)
    25	// G1: go G2()
    26	// G1: Wait() // Context switch after Unlock() and before Semacquire().
    27	// G2: Done() // Release semaphore: sema == 1, waiters == 0. G1 doesn't run yet.
    28	// G3: Wait() // Finds counter == 0, waiters == 0, doesn't block.
    29	// G3: Add(1) // Makes counter == 1, waiters == 0.
    30	// G3: go G4()
    31	// G3: Wait() // G1 still hasn't run, G3 finds sema == 1, unblocked! Bug.
    32	
    33	// Add adds delta, which may be negative, to the WaitGroup counter.
    34	// If the counter becomes zero, all goroutines blocked on Wait() are released.
    35	func (wg *WaitGroup) Add(delta int) {
    36		v := atomic.AddInt32(&wg.counter, int32(delta))
    37		if v < 0 {
    38			panic("sync: negative WaitGroup count")
    39		}
    40		if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 {
    41			return
    42		}
    43		wg.m.Lock()
    44		for i := int32(0); i < wg.waiters; i++ {
    45			runtime_Semrelease(wg.sema)
    46		}
    47		wg.waiters = 0
    48		wg.sema = nil
    49		wg.m.Unlock()
    50	}
    51	
    52	// Done decrements the WaitGroup counter.
    53	func (wg *WaitGroup) Done() {
    54		wg.Add(-1)
    55	}
    56	
    57	// Wait blocks until the WaitGroup counter is zero.
    58	func (wg *WaitGroup) Wait() {
    59		if atomic.LoadInt32(&wg.counter) == 0 {
    60			return
    61		}
    62		wg.m.Lock()
    63		atomic.AddInt32(&wg.waiters, 1)
    64		// This code is racing with the unlocked path in Add above.
    65		// The code above modifies counter and then reads waiters.
    66		// We must modify waiters and then read counter (the opposite order)
    67		// to avoid missing an Add.
    68		if atomic.LoadInt32(&wg.counter) == 0 {
    69			atomic.AddInt32(&wg.waiters, -1)
    70			wg.m.Unlock()
    71			return
    72		}
    73		if wg.sema == nil {
    74			wg.sema = new(uint32)
    75		}
    76		s := wg.sema
    77		wg.m.Unlock()
    78		runtime_Semacquire(s)
    79	}