Go の並行プログラミング

「Go 言語で学ぶ並行プログラミング」で紹介されていたパターンを記録しています。

SRC モデル(共有メモリモデル)

共有データへのアクセスを制御し、メモリの整合性を保つ手法。

Mutex

排他制御

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    num := 0
    mutex := sync.Mutex{}

    go add(&num, &mutex)
    go sub(&num, &mutex)

    time.Sleep(3 * time.Second)
    fmt.Println("num: ", num)
}

func add(num *int, mutex *sync.Mutex) {
    for range 1_000_000 {
        mutex.Lock()
        *num += 1
        mutex.Unlock()
    }
}

func sub(num *int, mutex *sync.Mutex) {
    for range 1_000_000 {
        mutex.Lock()
        *num -= 1
        mutex.Unlock()
    }
}

num: 0

type Mutex

Reader Writer Mutex

読込優先/書込専用ロック。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    start := time.Now()
    mutex := sync.RWMutex{}

    go writer(start, &mutex)
    for range 10 {
        go reader(&mutex)
    }

    time.Sleep(3 * time.Second)
}

func writer(start time.Time, mutex *sync.RWMutex) {
    for range 10 {
        mutex.Lock()
        time.Sleep(100 * time.Millisecond)
        mutex.Unlock()
    }
    duration := time.Since(start)
    fmt.Println("duration(millisecond):", duration.Milliseconds())  // reader 側の lock に影響されず 1000 ミリ秒強の実行時間になる
}

func reader(mutex *sync.RWMutex) {
    mutex.RLock()
    time.Sleep(100 * time.Millisecond)
    mutex.RUnlock()
}

duration(millisecond): 1207

type RWMutex

条件変数と Monitor

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    num := 0
    mutex := sync.Mutex{}
    cond := sync.NewCond(&mutex)

    go add(&num, cond)
    go subIfPositive(&num, cond)

    time.Sleep(5 * time.Second)
    fmt.Println("num", num) // 「1 の 500,000 回の加算」と「5 の 100,000 回の減算」なので 0 となる
}

func add(num *int, cond *sync.Cond) {
    for range 500_000 {
        cond.L.Lock()
        *num += 1
        cond.Signal()
        cond.L.Unlock()
    }
}

func subIfPositive(num *int, cond *sync.Cond) {
    for range 100_000 {
        cond.L.Lock()
        for *num < 5 {
            cond.Wait()
        }
        *num -= 5
        cond.L.Unlock()
    }
}

num 0

type Cond

type Cond struct {

    // L is held while observing or changing the condition
    L Locker
    // contains filtered or unexported fields
}

Cond は、イベントの発生を待機または通知する goroutine の待ち合わせ場所である条件変数を実装します。

各 Cond には関連付けられた Locker L (多くの場合MutexまたはRWMutex ) があり、条件を変更するときやCond.Waitメソッドを呼び出すときにはこれを保持する必要があります。

Cond は、最初に使用した後にコピーすることはできません。

Go メモリ モデル の用語では、Cond は、 Cond.BroadcastまたはCond.Signalへの呼び出しが、ブロックを解除する Wait 呼び出しの前に「同期」するように調整します。

多くの単純な使用例では、Cond よりもチャネルを使用する方がユーザーにとって便利です (Broadcast はチャネルを閉じることに相当し、Signal はチャネル上で送信することに相当します)。

Semaphore

リソース数制限。

package main

import (
    "context"
    "fmt"
    "time"

    "golang.org/x/sync/semaphore"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    semaphore := semaphore.NewWeighted(2) // maximum 2 の semaphore をつくる

    for range 12 {
        go getNow(ctx, semaphore)
    }

    time.Sleep(10 * time.Second)
}

func getNow(ctx context.Context, semaphore *semaphore.Weighted) {
    if err := semaphore.Acquire(ctx, 1); err != nil {
        fmt.Printf("Failed to acquire semaphore: %v\n", err) // 最後の 2 件はタイムアウト
        return
    }

    fmt.Println(time.Now().Format(time.DateTime)) // 1 秒ごとに 2 件ずつ表示される
    time.Sleep(1 * time.Second)

    semaphore.Release(1)
}

2026-01-07 00:37:10
2026-01-07 00:37:10
2026-01-07 00:37:11
2026-01-07 00:37:11
2026-01-07 00:37:12
2026-01-07 00:37:12
2026-01-07 00:37:13
2026-01-07 00:37:13
2026-01-07 00:37:14
2026-01-07 00:37:14
Failed to acquire semaphore: context deadline exceeded
Failed to acquire semaphore: context deadline exceeded

golang.org/x/sync/semaphore

WaitGroup

完了待ち。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {
    wg := sync.WaitGroup{}
    maximum := 8

    for range maximum {
        wg.Add(1) // ゴルーチンの数だけ wait group を count up
        go worker(&wg)
    }
    wg.Wait() // wait group が 0 になるのを待つ

    // Go メソッドで代替できる
    // for range maximum {
    //     wg.Go(func() {
    //         _worker()
    //     })
    // }
    // wg.Wait()

    fmt.Printf("%d worker Done.\n", maximum)
}

func worker(wg *sync.WaitGroup) {
    i := rand.Intn(10)
    time.Sleep(time.Duration(i) * time.Second)
    fmt.Println("worker done: ", time.Now().Format(time.DateTime))

    wg.Done() // wait group に終了を通知(count down)
}

func _worker() {
    i := rand.Intn(10)
    time.Sleep(time.Duration(i) * time.Second)
    fmt.Println("worker done: ", time.Now().Format(time.DateTime))
}

type WaitGroup

Barrier

同期してからの開始。

go の標準・準標準ライブラリには Barrier にあたるものはないらしく、条件変数を利用した実装例が紹介されていました。

package main

import (
    "fmt"
    "sync"
    "time"
)

type Barrier struct {
    size      int // barrier に到達すべきゴルーチンの合計数
    waitCount int // 現在待ち状態のゴルーチンの数
    cond      *sync.Cond
}

func NewBarrier(size int) *Barrier {
    condVar := sync.NewCond(&sync.Mutex{})
    return &Barrier{size, 0, condVar}
}

func (b *Barrier) Wait() {
    b.cond.L.Lock()
    b.waitCount += 1

    if b.waitCount == b.size { // 待ち状態のゴルーチン数が一致した場合に broadcast
        b.waitCount = 0
        b.cond.Broadcast()
    } else { // 待ちゴルーチン数が足りない場合には wait
        b.cond.Wait()
    }
    b.cond.L.Unlock()
}

func main() {
    barrier := NewBarrier(2)

    go worker("AAA", 2, barrier)
    go worker("BBB", 5, barrier)

    time.Sleep(10 * time.Second)
}

func worker(name string, waitSecond int, barrier *Barrier) {
    fmt.Printf("%s is starting: %s\n", name, time.Now().Format(time.DateTime))

    time.Sleep(time.Duration(waitSecond) * time.Second)
    fmt.Printf("%s is waiting: %s\n", name, time.Now().Format(time.DateTime))
    barrier.Wait()

    fmt.Printf("%s is finish: %s\n", name, time.Now().Format(time.DateTime))
}

BBB is starting: 2026-01-08 01:49:48
AAA is starting: 2026-01-08 01:49:48
AAA is waiting: 2026-01-08 01:49:50
BBB is waiting: 2026-01-08 01:49:53
BBB is finish: 2026-01-08 01:49:53
AAA is finish: 2026-01-08 01:49:53

CSP (Communicating Sequential Processes)

メモリを共有せず、通信によって共有する手法。

quit channel

channel の終了を通知。

package main

import "fmt"

func main() {
    numbers := make(chan int)
    quit := make(chan int)
    printNumber(numbers, quit)
    next := 0
    for range 20 {
        select {
        case numbers <- next: // number チャネルに数値を送信する
        case <-quit: // for 文が 20 回周る前に quit チャネルからの受信があるためクローズする
            fmt.Println("Quitting")
            return
        }
        next += 1
    }
}

func printNumber(numbers <-chan int, quit chan int) {
    go func() {
        for range 10 {
            fmt.Println(<-numbers)
        }
        close(quit) // 数値を 10 個受け取ったら quit チャネルを閉じる
    }()
}

0
1
2
3
4
5
6
7
8
9
Quitting

Pipeline

package main

import "fmt"

func main() {
    quit := make(chan int)
    defer close(quit)

    numbers := []int{1, 2, 3}
    fmt.Println("[start]", numbers)

    // パイプラインのステップにあたる関数をネストして呼ぶ
    results := triple(quit, double(quit, numbers))

    // range はチャネルが閉じるまで for を回してくれる
    for result := range results {
        fmt.Println("[end]", result)
    }
}

func double(quit <-chan int, numbers []int) <-chan int {
    outChan := make(chan int)
    go func() {
        defer close(outChan)

        for _, number := range numbers {
            out := number * 2
            fmt.Printf("[double] %d -> %d\n", number, out)

            select {
            case outChan <- out: // 次のステップに値を渡す
            case <-quit:
                return
            }
        }
    }()
    return outChan
}

func triple(quit <-chan int, inChan <-chan int) <-chan int {
    outChan := make(chan int)
    go func() {
        defer close(outChan)

        for number := range inChan {
            out := number * 3
            fmt.Printf("[triple] %d -> %d\n", number, out)

            select {
            case outChan <- out: // 次のステップに値を渡す
            case <-quit:
                return
            }

        }
    }()
    return outChan
}

[start] [1 2 3]
[double] 1 -> 2
[double] 2 -> 4
[triple] 2 -> 6
[triple] 4 -> 12
[double] 3 -> 6
[end] 6
[end] 12
[triple] 6 -> 18
[end] 18

Fan-out / Fan-in

負荷分散と集約。

package main

import (
    "fmt"
    "sync"
)

func main() {
    quit := make(chan int)
    defer close(quit)

    numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    fmt.Println("[start]", numbers)

    doubledChan := double(quit, numbers)

    // fan-out 部分:複数の triple ワーカーを立ち上げる
    // 各ワーカーは同じチャネルを共有する
    numWorkers := 3
    workers := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        workers[i] = triple(quit, doubledChan, i)
    }

    // fan-in 部分:複数のワーカーのチャネルを束ねる
    results := fanIn(quit, workers...)

    for result := range results {
        fmt.Println("[end]", result)
    }
}

// pipeline のパターンと変化なし
func double(quit <-chan int, numbers []int) <-chan int {
    outChan := make(chan int)
    go func() {
        defer close(outChan)

        for _, number := range numbers {
            out := number * 2
            fmt.Printf("[double] %d -> %d\n", number, out)

            select {
            case outChan <- out:
            case <-quit:
                return
            }
        }
    }()
    return outChan
}

// pipeline のパターンと比較して、id を引数に追加しただけ
func triple(quit <-chan int, inChan <-chan int, id int) <-chan int {
    outChan := make(chan int)
    go func() {
        defer close(outChan)

        for number := range inChan {
            out := number * 3
            fmt.Printf("[triple] (%d)  %d -> %d\n", id, number, out)

            select {
            case outChan <- out:
            case <-quit:
                return
            }

        }
    }()
    return outChan
}

func fanIn[K any](quit <-chan int, allChannels ...<-chan K) chan K {
    // 束ねたい入力 channel と同数の wait group を作る
    wg := sync.WaitGroup{}
    wg.Add(len(allChannels))

    // 出力用 channel
    output := make(chan K)

    // 入力 channel 毎に受信し、出力 channel へ転送するゴルーチンを用意する
    for _, c := range allChannels {
        go func(channel <-chan K) {
            defer wg.Done()
            for i := range channel {
                select {
                case output <- i:
                case <-quit:
                    return
                }
            }
        }(c)
    }

    // 全ての入力 channel が閉じてから、出力 channel を閉じる
    go func() {
        wg.Wait()
        close(output)
    }()
    return output
}

[start] [1 2 3 4 5 6 7 8 9 10]
[double] 1 -> 2
[triple] (0) 2 -> 6
[double] 2 -> 4
[double] 3 -> 6
[double] 4 -> 8
[double] 5 -> 10
[triple] (0) 8 -> 24
[triple] (0) 10 -> 30
[double] 6 -> 12
[triple] (1) 6 -> 18
[triple] (1) 12 -> 36
[double] 7 -> 14
[end] 6
[end] 24
[end] 18
[end] 36
[end] 30
[triple] (1) 14 -> 42
[end] 42
[double] 8 -> 16
[double] 9 -> 18
[triple] (1) 16 -> 48
[triple] (1) 18 -> 54
[double] 10 -> 20
[triple] (2) 4 -> 12
[end] 48
[end] 54
[end] 12
[triple] (0) 20 -> 60
[end] 60

Broadcast

通知の拡散。

package main

import (
    "fmt"
    "sync"
)

func main() {
    quit := make(chan int)
    defer close(quit)

    // 入力用の channel を用意
    inputChan := make(chan string)
    go func() {
        defer close(inputChan)
        inputChan <- "world"
    }()

    // broadcast 処理に入力 channel を渡して、ブロードキャスト数を伝える
    outputs := broadcast(quit, inputChan, 2)

    // 各パイプラインの完了を待機する WaitGroup
    var wg sync.WaitGroup
    wg.Add(2)

    // パイプライン1: broadcast 処理 より値を受け取る hello 処理
    go func() {
        defer wg.Done()
        fmt.Println(<-hello(quit, outputs[0]))
    }()

    // パイプライン2: broadcast 処理 より値を受け取る goodbye 処理
    go func() {
        defer wg.Done()
        fmt.Println(<-goodbye(quit, outputs[1]))
    }()

    wg.Wait()
}

func broadcast[K any](quit <-chan int, inputChan <-chan K, n int) []chan K {
    // broadcast 向けに必要数分の channel を用意
    outChannels := createAll[K](n)

    go func() {
        defer closeAll(outChannels...)

        for val := range inputChan {
            // 全ワーカーへの配送保証
            var wg sync.WaitGroup
            wg.Add(len(outChannels))

            // 用意した channel に値を配信
            for _, ch := range outChannels {
                go func(ch chan K, val K) {
                    defer wg.Done()
                    select {
                    case ch <- val:
                    case <-quit:
                        return
                    }
                }(ch, val)
            }
            // すべての channel に値を配信終わるまで待つ
            wg.Wait()
        }
    }()
    return outChannels
}

func createAll[K any](n int) []chan K {
    channels := make([]chan K, n)
    for i := range channels {
        channels[i] = make(chan K)
    }
    return channels
}

func closeAll[K any](channels ...chan K) {
    for _, channel := range channels {
        close(channel)
    }
}

func hello(quit <-chan int, inChan <-chan string) <-chan string {
    outChan := make(chan string)
    go func() {
        defer close(outChan)

        for input := range inChan {
            msg := "hello, " + input
            select {
            case outChan <- msg:
            case <-quit:
                return
            }
        }
    }()
    return outChan
}

func goodbye(quit <-chan int, inChan <-chan string) <-chan string {
    outChan := make(chan string)
    go func() {
        defer close(outChan)

        for input := range inChan {
            msg := "goodbye, " + input
            select {
            case outChan <- msg:
            case <-quit:
                return
            }
        }
    }()
    return outChan
}

goodbye, world
hello, world

fork / join

入力を分割処理して集約。

package main

import (
    "fmt"
    "sync"
)

type result struct {
    index int
    value int
}

func main() {
    input := []int{1, 2, 3, 4, 5, 6}
    joinedResults := make([]int, len(input))

    quit := make(chan int)
    defer close(quit)

    fmt.Println("[start]", input)

    // fork stage と join stage 間で値を授受する channel
    resultChan := make(chan result)

    // 完了通知用の channel
    done := make(chan struct{})

    // --- fork stage ---
    numOfWorkers := 3
    chunkSize := len(input) / numOfWorkers

    // fork するゴルーチン分の wait group を用意
    var wg sync.WaitGroup

    for i := range numOfWorkers {
        wg.Add(1)
        start := i * chunkSize
        end := start + chunkSize

        // result channel 配信用のゴルーチンを用意
        go func(start, end int) {
            defer wg.Done()
            for j := start; j < end; j++ {

                fmt.Printf("[fork (%d)] index: %d, input: %d\n", i, j, input[j])
                select {
                case resultChan <- result{index: j, value: input[j] * 2}:
                case <-quit:
                    return
                }
            }
        }(start, end)
    }

    // --- join stage ---
    // fork stage の全ゴルーチンが終了したら channel を閉じる
    go func() {
        wg.Wait()
        close(resultChan)
    }()

    // fork stage からの結果を集約する
    go func() {
        for res := range resultChan {
            fmt.Printf("[join] index: %d, input: %d, result.value: %d\n", res.index, input[res.index], res.value)
            joinedResults[res.index] = res.value
        }
        close(done)
    }()

    // 完了通知が来るまで待機
    <-done
    fmt.Println("[end]", joinedResults)
}

[start] [1 2 3 4 5 6]
[fork (1)] index: 2, input: 3
[fork (1)] index: 3, input: 4
[join] index: 2, input: 3, result.value: 6
[join] index: 3, input: 4, result.value: 8
[fork (0)] index: 0, input: 1
[fork (0)] index: 1, input: 2
[join] index: 0, input: 1, result.value: 2
[join] index: 1, input: 2, result.value: 4
[fork (2)] index: 4, input: 5
[fork (2)] index: 5, input: 6
[join] index: 4, input: 5, result.value: 10
[join] index: 5, input: 6, result.value: 12

worker pool

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    inputChan := make(chan int)
    var wg sync.WaitGroup

    // ワーカーを起動
    numOfWorker := 3
    startWorker(numOfWorker, inputChan, &wg)

    // インプットを投入
    for i := range 10 {
        inputChan <- i
    }
    close(inputChan)

    // ワーカーの終了を待機
    wg.Wait()
    fmt.Println("done")
}

func startWorker(numOfWorker int, inputChan <-chan int, wg *sync.WaitGroup) {
    for workerId := range numOfWorker {
        wg.Add(1)
        go func(workerId int) {
            defer wg.Done()
            for input := range inputChan {
                doWork(workerId, input)
            }
        }(workerId)
    }
}

func doWork(workerId int, input int) {
    fmt.Printf("[workerId (%d)] %d\n", workerId, input)
    time.Sleep(1 * time.Second)
}

[workerId (2)] 0
[workerId (1)] 1
[workerId (0)] 2
[workerId (1)] 3
[workerId (0)] 5
[workerId (2)] 4
[workerId (2)] 6
[workerId (1)] 7
[workerId (0)] 8
[workerId (0)] 9
done