「Go 言語で学ぶ並行プログラミング」で紹介されていたパターンを記録しています。
SRC モデル(共有メモリモデル)
共有データへのアクセスを制御し、メモリの整合性を保つ手法。
Mutex
排他制御。
package main import ( "fmt" "sync" "time" ) func main() { num := 0 mutex := sync.Mutex{} go add(&num, &mutex) go sub(&num, &mutex) time.Sleep(3 * time.Second) fmt.Println("num: ", num) } func add(num *int, mutex *sync.Mutex) { for range 1_000_000 { mutex.Lock() *num += 1 mutex.Unlock() } } func sub(num *int, mutex *sync.Mutex) { for range 1_000_000 { mutex.Lock() *num -= 1 mutex.Unlock() } }
num: 0
Reader Writer Mutex
読込優先/書込専用ロック。
package main import ( "fmt" "sync" "time" ) func main() { start := time.Now() mutex := sync.RWMutex{} go writer(start, &mutex) for range 10 { go reader(&mutex) } time.Sleep(3 * time.Second) } func writer(start time.Time, mutex *sync.RWMutex) { for range 10 { mutex.Lock() time.Sleep(100 * time.Millisecond) mutex.Unlock() } duration := time.Since(start) fmt.Println("duration(millisecond):", duration.Milliseconds()) // reader 側の lock に影響されず 1000 ミリ秒強の実行時間になる } func reader(mutex *sync.RWMutex) { mutex.RLock() time.Sleep(100 * time.Millisecond) mutex.RUnlock() }
duration(millisecond): 1207
条件変数と Monitor
package main import ( "fmt" "sync" "time" ) func main() { num := 0 mutex := sync.Mutex{} cond := sync.NewCond(&mutex) go add(&num, cond) go subIfPositive(&num, cond) time.Sleep(5 * time.Second) fmt.Println("num", num) // 「1 の 500,000 回の加算」と「5 の 100,000 回の減算」なので 0 となる } func add(num *int, cond *sync.Cond) { for range 500_000 { cond.L.Lock() *num += 1 cond.Signal() cond.L.Unlock() } } func subIfPositive(num *int, cond *sync.Cond) { for range 100_000 { cond.L.Lock() for *num < 5 { cond.Wait() } *num -= 5 cond.L.Unlock() } }
num 0
type Cond struct { // L is held while observing or changing the condition L Locker // contains filtered or unexported fields }
Cond は、イベントの発生を待機または通知する goroutine の待ち合わせ場所である条件変数を実装します。
各 Cond には関連付けられた Locker L (多くの場合MutexまたはRWMutex ) があり、条件を変更するときやCond.Waitメソッドを呼び出すときにはこれを保持する必要があります。
Cond は、最初に使用した後にコピーすることはできません。
Go メモリ モデル の用語では、Cond は、 Cond.BroadcastまたはCond.Signalへの呼び出しが、ブロックを解除する Wait 呼び出しの前に「同期」するように調整します。
多くの単純な使用例では、Cond よりもチャネルを使用する方がユーザーにとって便利です (Broadcast はチャネルを閉じることに相当し、Signal はチャネル上で送信することに相当します)。
Semaphore
リソース数制限。
package main import ( "context" "fmt" "time" "golang.org/x/sync/semaphore" ) func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() semaphore := semaphore.NewWeighted(2) // maximum 2 の semaphore をつくる for range 12 { go getNow(ctx, semaphore) } time.Sleep(10 * time.Second) } func getNow(ctx context.Context, semaphore *semaphore.Weighted) { if err := semaphore.Acquire(ctx, 1); err != nil { fmt.Printf("Failed to acquire semaphore: %v\n", err) // 最後の 2 件はタイムアウト return } fmt.Println(time.Now().Format(time.DateTime)) // 1 秒ごとに 2 件ずつ表示される time.Sleep(1 * time.Second) semaphore.Release(1) }
2026-01-07 00:37:10
2026-01-07 00:37:10
2026-01-07 00:37:11
2026-01-07 00:37:11
2026-01-07 00:37:12
2026-01-07 00:37:12
2026-01-07 00:37:13
2026-01-07 00:37:13
2026-01-07 00:37:14
2026-01-07 00:37:14
Failed to acquire semaphore: context deadline exceeded
Failed to acquire semaphore: context deadline exceeded
WaitGroup
完了待ち。
package main import ( "fmt" "math/rand" "sync" "time" ) func main() { wg := sync.WaitGroup{} maximum := 8 for range maximum { wg.Add(1) // ゴルーチンの数だけ wait group を count up go worker(&wg) } wg.Wait() // wait group が 0 になるのを待つ // Go メソッドで代替できる // for range maximum { // wg.Go(func() { // _worker() // }) // } // wg.Wait() fmt.Printf("%d worker Done.\n", maximum) } func worker(wg *sync.WaitGroup) { i := rand.Intn(10) time.Sleep(time.Duration(i) * time.Second) fmt.Println("worker done: ", time.Now().Format(time.DateTime)) wg.Done() // wait group に終了を通知(count down) } func _worker() { i := rand.Intn(10) time.Sleep(time.Duration(i) * time.Second) fmt.Println("worker done: ", time.Now().Format(time.DateTime)) }
Barrier
同期してからの開始。
go の標準・準標準ライブラリには Barrier にあたるものはないらしく、条件変数を利用した実装例が紹介されていました。
package main import ( "fmt" "sync" "time" ) type Barrier struct { size int // barrier に到達すべきゴルーチンの合計数 waitCount int // 現在待ち状態のゴルーチンの数 cond *sync.Cond } func NewBarrier(size int) *Barrier { condVar := sync.NewCond(&sync.Mutex{}) return &Barrier{size, 0, condVar} } func (b *Barrier) Wait() { b.cond.L.Lock() b.waitCount += 1 if b.waitCount == b.size { // 待ち状態のゴルーチン数が一致した場合に broadcast b.waitCount = 0 b.cond.Broadcast() } else { // 待ちゴルーチン数が足りない場合には wait b.cond.Wait() } b.cond.L.Unlock() } func main() { barrier := NewBarrier(2) go worker("AAA", 2, barrier) go worker("BBB", 5, barrier) time.Sleep(10 * time.Second) } func worker(name string, waitSecond int, barrier *Barrier) { fmt.Printf("%s is starting: %s\n", name, time.Now().Format(time.DateTime)) time.Sleep(time.Duration(waitSecond) * time.Second) fmt.Printf("%s is waiting: %s\n", name, time.Now().Format(time.DateTime)) barrier.Wait() fmt.Printf("%s is finish: %s\n", name, time.Now().Format(time.DateTime)) }
BBB is starting: 2026-01-08 01:49:48
AAA is starting: 2026-01-08 01:49:48
AAA is waiting: 2026-01-08 01:49:50
BBB is waiting: 2026-01-08 01:49:53
BBB is finish: 2026-01-08 01:49:53
AAA is finish: 2026-01-08 01:49:53
CSP (Communicating Sequential Processes)
メモリを共有せず、通信によって共有する手法。
quit channel
channel の終了を通知。
package main import "fmt" func main() { numbers := make(chan int) quit := make(chan int) printNumber(numbers, quit) next := 0 for range 20 { select { case numbers <- next: // number チャネルに数値を送信する case <-quit: // for 文が 20 回周る前に quit チャネルからの受信があるためクローズする fmt.Println("Quitting") return } next += 1 } } func printNumber(numbers <-chan int, quit chan int) { go func() { for range 10 { fmt.Println(<-numbers) } close(quit) // 数値を 10 個受け取ったら quit チャネルを閉じる }() }
0
1
2
3
4
5
6
7
8
9
Quitting
Pipeline
package main import "fmt" func main() { quit := make(chan int) defer close(quit) numbers := []int{1, 2, 3} fmt.Println("[start]", numbers) // パイプラインのステップにあたる関数をネストして呼ぶ results := triple(quit, double(quit, numbers)) // range はチャネルが閉じるまで for を回してくれる for result := range results { fmt.Println("[end]", result) } } func double(quit <-chan int, numbers []int) <-chan int { outChan := make(chan int) go func() { defer close(outChan) for _, number := range numbers { out := number * 2 fmt.Printf("[double] %d -> %d\n", number, out) select { case outChan <- out: // 次のステップに値を渡す case <-quit: return } } }() return outChan } func triple(quit <-chan int, inChan <-chan int) <-chan int { outChan := make(chan int) go func() { defer close(outChan) for number := range inChan { out := number * 3 fmt.Printf("[triple] %d -> %d\n", number, out) select { case outChan <- out: // 次のステップに値を渡す case <-quit: return } } }() return outChan }
[start] [1 2 3]
[double] 1 -> 2
[double] 2 -> 4
[triple] 2 -> 6
[triple] 4 -> 12
[double] 3 -> 6
[end] 6
[end] 12
[triple] 6 -> 18
[end] 18
Fan-out / Fan-in
負荷分散と集約。
package main import ( "fmt" "sync" ) func main() { quit := make(chan int) defer close(quit) numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} fmt.Println("[start]", numbers) doubledChan := double(quit, numbers) // fan-out 部分:複数の triple ワーカーを立ち上げる // 各ワーカーは同じチャネルを共有する numWorkers := 3 workers := make([]<-chan int, numWorkers) for i := 0; i < numWorkers; i++ { workers[i] = triple(quit, doubledChan, i) } // fan-in 部分:複数のワーカーのチャネルを束ねる results := fanIn(quit, workers...) for result := range results { fmt.Println("[end]", result) } } // pipeline のパターンと変化なし func double(quit <-chan int, numbers []int) <-chan int { outChan := make(chan int) go func() { defer close(outChan) for _, number := range numbers { out := number * 2 fmt.Printf("[double] %d -> %d\n", number, out) select { case outChan <- out: case <-quit: return } } }() return outChan } // pipeline のパターンと比較して、id を引数に追加しただけ func triple(quit <-chan int, inChan <-chan int, id int) <-chan int { outChan := make(chan int) go func() { defer close(outChan) for number := range inChan { out := number * 3 fmt.Printf("[triple] (%d) %d -> %d\n", id, number, out) select { case outChan <- out: case <-quit: return } } }() return outChan } func fanIn[K any](quit <-chan int, allChannels ...<-chan K) chan K { // 束ねたい入力 channel と同数の wait group を作る wg := sync.WaitGroup{} wg.Add(len(allChannels)) // 出力用 channel output := make(chan K) // 入力 channel 毎に受信し、出力 channel へ転送するゴルーチンを用意する for _, c := range allChannels { go func(channel <-chan K) { defer wg.Done() for i := range channel { select { case output <- i: case <-quit: return } } }(c) } // 全ての入力 channel が閉じてから、出力 channel を閉じる go func() { wg.Wait() close(output) }() return output }
[start] [1 2 3 4 5 6 7 8 9 10]
[double] 1 -> 2
[triple] (0) 2 -> 6
[double] 2 -> 4
[double] 3 -> 6
[double] 4 -> 8
[double] 5 -> 10
[triple] (0) 8 -> 24
[triple] (0) 10 -> 30
[double] 6 -> 12
[triple] (1) 6 -> 18
[triple] (1) 12 -> 36
[double] 7 -> 14
[end] 6
[end] 24
[end] 18
[end] 36
[end] 30
[triple] (1) 14 -> 42
[end] 42
[double] 8 -> 16
[double] 9 -> 18
[triple] (1) 16 -> 48
[triple] (1) 18 -> 54
[double] 10 -> 20
[triple] (2) 4 -> 12
[end] 48
[end] 54
[end] 12
[triple] (0) 20 -> 60
[end] 60
Broadcast
通知の拡散。
package main import ( "fmt" "sync" ) func main() { quit := make(chan int) defer close(quit) // 入力用の channel を用意 inputChan := make(chan string) go func() { defer close(inputChan) inputChan <- "world" }() // broadcast 処理に入力 channel を渡して、ブロードキャスト数を伝える outputs := broadcast(quit, inputChan, 2) // 各パイプラインの完了を待機する WaitGroup var wg sync.WaitGroup wg.Add(2) // パイプライン1: broadcast 処理 より値を受け取る hello 処理 go func() { defer wg.Done() fmt.Println(<-hello(quit, outputs[0])) }() // パイプライン2: broadcast 処理 より値を受け取る goodbye 処理 go func() { defer wg.Done() fmt.Println(<-goodbye(quit, outputs[1])) }() wg.Wait() } func broadcast[K any](quit <-chan int, inputChan <-chan K, n int) []chan K { // broadcast 向けに必要数分の channel を用意 outChannels := createAll[K](n) go func() { defer closeAll(outChannels...) for val := range inputChan { // 全ワーカーへの配送保証 var wg sync.WaitGroup wg.Add(len(outChannels)) // 用意した channel に値を配信 for _, ch := range outChannels { go func(ch chan K, val K) { defer wg.Done() select { case ch <- val: case <-quit: return } }(ch, val) } // すべての channel に値を配信終わるまで待つ wg.Wait() } }() return outChannels } func createAll[K any](n int) []chan K { channels := make([]chan K, n) for i := range channels { channels[i] = make(chan K) } return channels } func closeAll[K any](channels ...chan K) { for _, channel := range channels { close(channel) } } func hello(quit <-chan int, inChan <-chan string) <-chan string { outChan := make(chan string) go func() { defer close(outChan) for input := range inChan { msg := "hello, " + input select { case outChan <- msg: case <-quit: return } } }() return outChan } func goodbye(quit <-chan int, inChan <-chan string) <-chan string { outChan := make(chan string) go func() { defer close(outChan) for input := range inChan { msg := "goodbye, " + input select { case outChan <- msg: case <-quit: return } } }() return outChan }
goodbye, world
hello, world
fork / join
入力を分割処理して集約。
package main import ( "fmt" "sync" ) type result struct { index int value int } func main() { input := []int{1, 2, 3, 4, 5, 6} joinedResults := make([]int, len(input)) quit := make(chan int) defer close(quit) fmt.Println("[start]", input) // fork stage と join stage 間で値を授受する channel resultChan := make(chan result) // 完了通知用の channel done := make(chan struct{}) // --- fork stage --- numOfWorkers := 3 chunkSize := len(input) / numOfWorkers // fork するゴルーチン分の wait group を用意 var wg sync.WaitGroup for i := range numOfWorkers { wg.Add(1) start := i * chunkSize end := start + chunkSize // result channel 配信用のゴルーチンを用意 go func(start, end int) { defer wg.Done() for j := start; j < end; j++ { fmt.Printf("[fork (%d)] index: %d, input: %d\n", i, j, input[j]) select { case resultChan <- result{index: j, value: input[j] * 2}: case <-quit: return } } }(start, end) } // --- join stage --- // fork stage の全ゴルーチンが終了したら channel を閉じる go func() { wg.Wait() close(resultChan) }() // fork stage からの結果を集約する go func() { for res := range resultChan { fmt.Printf("[join] index: %d, input: %d, result.value: %d\n", res.index, input[res.index], res.value) joinedResults[res.index] = res.value } close(done) }() // 完了通知が来るまで待機 <-done fmt.Println("[end]", joinedResults) }
[start] [1 2 3 4 5 6]
[fork (1)] index: 2, input: 3
[fork (1)] index: 3, input: 4
[join] index: 2, input: 3, result.value: 6
[join] index: 3, input: 4, result.value: 8
[fork (0)] index: 0, input: 1
[fork (0)] index: 1, input: 2
[join] index: 0, input: 1, result.value: 2
[join] index: 1, input: 2, result.value: 4
[fork (2)] index: 4, input: 5
[fork (2)] index: 5, input: 6
[join] index: 4, input: 5, result.value: 10
[join] index: 5, input: 6, result.value: 12
worker pool
package main import ( "fmt" "sync" "time" ) func main() { inputChan := make(chan int) var wg sync.WaitGroup // ワーカーを起動 numOfWorker := 3 startWorker(numOfWorker, inputChan, &wg) // インプットを投入 for i := range 10 { inputChan <- i } close(inputChan) // ワーカーの終了を待機 wg.Wait() fmt.Println("done") } func startWorker(numOfWorker int, inputChan <-chan int, wg *sync.WaitGroup) { for workerId := range numOfWorker { wg.Add(1) go func(workerId int) { defer wg.Done() for input := range inputChan { doWork(workerId, input) } }(workerId) } } func doWork(workerId int, input int) { fmt.Printf("[workerId (%d)] %d\n", workerId, input) time.Sleep(1 * time.Second) }
[workerId (2)] 0
[workerId (1)] 1
[workerId (0)] 2
[workerId (1)] 3
[workerId (0)] 5
[workerId (2)] 4
[workerId (2)] 6
[workerId (1)] 7
[workerId (0)] 8
[workerId (0)] 9
done
