Tags

  • 技术

Go语言主要通过 goroutinechannel 实现高效并发,辅以 sync 包和 context 提供同步和任务管理。本文总结Go的所有并发编程技巧,并通过案例说明。

一、核心概念

  1. Goroutine:轻量级线程,由Go运行时管理,创建成本低(初始栈2KB,动态扩展)。
  2. Channel:goroutine间通信机制,支持阻塞式数据传递,避免数据竞争。
  3. Sync包:提供锁、等待组等原语,保护共享资源。
  4. Context:管理goroutine生命周期,支持取消和超时。
  5. Select:处理多channel操作,支持非阻塞和超时。
  6. 并发模式:如生产者-消费者、工作者池、管道等。

二、并发编程技巧

1. 使用Goroutine实现轻量并发

  • 技巧:用 go 关键字启动goroutine,需配合同步机制。
  • 注意:goroutine执行顺序不定。

2. 使用Channel进行安全通信

  • 技巧:优先用channel传递数据,关闭channel通知结束。
  • 规则:发送方关闭channel,接收方用 for range

3. 使用Select处理多Channel

  • 技巧:用 select 实现多路复用或超时控制。

4. 使用Context管理任务

  • 技巧:通过 context 实现取消和超时,确保资源清理。

5. 使用Sync包保护共享资源

  • 技巧
    • sync.Mutex / sync.RWMutex:保护临界区。
    • sync.WaitGroup:等待goroutine完成。
    • sync.ErrGroup: 并行任务,捕获第一个错误。
    • sync.Once:确保单次执行。
    • sync.Pool:对象复用,降低GC压力。
    • sync.Cond: 构造条件变量,后续通过 .Broadcast() 广播通知所有等待的 goroutine

6. 原子操作优化性能

  • 技巧:用 sync/atomic 实现无锁操作,适合计数器等。

7. 并发模式

  • 工作者池:限制goroutine数量。
  • 扇入扇出:并行处理后汇总。
  • 管道:channel串联处理阶段。

三、案例分析

案例1:生产者-消费者

场景:一个生产者生成数据,多个消费者并行处理。

package main

import (
    "fmt"
    "time"
)

func producer(ch chan<- int) {
    for i := 1; i <= 5; i++ {
        ch <- i
        time.Sleep(time.Millisecond * 500)
    }
    close(ch)
}

func consumer(id int, ch <-chan int) {
    for num := range ch {
        fmt.Printf("Consumer %d received: %d\n", id, num)
    }
}

func main() {
    ch := make(chan int, 2)
    go producer(ch)
    go consumer(1, ch)
    go consumer(2, ch)
    time.Sleep(time.Second * 3)
}
输出:
Consumer 1 received: 1
Consumer 2 received: 2
Consumer 1 received: 3
Consumer 2 received: 4
Consumer 1 received: 5

案例2:Select实现超时

场景:任务超时退出。

package main

import (
    "fmt"
    "time"
)

func process(ch chan string, data string) {
    time.Sleep(time.Second * 2)
    ch <- data
}

func main() {
    ch := make(chan string)
    go process(ch, "result")
    select {
    case res := <-ch:
        fmt.Println("Received:", res)
    case <-time.After(time.Second * 1):
        fmt.Println("Timeout")
    }
}
输出:
Timeout

案例3:Context实现任务取消

场景:HTTP请求超时取消。

package main

import (
    "context"
    "fmt"
    "time"
)

func longRunningTask(ctx context.Context, ch chan<- string) {
    select {
    case <-time.After(time.Second * 2):
        ch <- "Task completed"
    case <-ctx.Done():
        fmt.Println("Task cancelled:", ctx.Err())
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
    defer cancel()
    ch := make(chan string)
    go longRunningTask(ctx, ch)
    select {
    case res := <-ch:
        fmt.Println(res)
    case <-ctx.Done():
        fmt.Println("Main:", ctx.Err())
    }
}
输出:
Task cancelled: context deadline exceeded
Main: context deadline exceeded

案例4:工作者池与WaitGroup

场景:并行处理任务,等待完成。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, tasks <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for task := range tasks {
        fmt.Printf("Worker %d processing task %d\n", id, task)
        time.Sleep(time.Millisecond * 500)
    }
}

func main() {
    tasks := make(chan int, 10)
    var wg sync.WaitGroup
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go worker(i, tasks, &wg)
    }
    for i := 1; i <= 5; i++ {
        tasks <- i
    }
    close(tasks)
    wg.Wait()
    fmt.Println("All tasks completed")
}
输出:
Worker 1 processing task 1
Worker 2 processing task 2
Worker 3 processing task 3
Worker 1 processing task 4
Worker 2 processing task 5
All tasks completed

案例5:Mutex保护共享资源

场景:多goroutine安全累加计数器。

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu    sync.Mutex
    count int
}

func (c *Counter) Inc() {
    c.mu.Lock()
    c.count++
    c.mu.Unlock()
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

func main() {
    counter := Counter{}
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            counter.Inc()
            wg.Done()
        }()
    }
    wg.Wait()
    fmt.Println("Final count:", counter.Value())
}
输出:
Final count: 1000

案例6:原子操作优化计数

场景:高性能计数器。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var count int32
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            atomic.AddInt32(&count, 1)
            wg.Done()
        }()
    }
    wg.Wait()
    fmt.Println("Final count:", atomic.LoadInt32(&count))
}
输出:
Final count: 1000

案例7:errgroup管理goroutine和错误

场景:并行任务,捕获第一个错误。

package main

import (
    "fmt"
    "golang.org/x/sync/errgroup"
)

func main() {
    var g errgroup.Group
    g.Go(func() error {
        return fmt.Errorf("task 1 failed")
    })
    g.Go(func() error {
        return nil
    })
    if err := g.Wait(); err != nil {
        fmt.Println("Error:", err)
    } else {
        fmt.Println("All tasks succeeded")
    }
}
输出:
Error: task 1 failed

案例8:使用 sync.cond 模拟车辆停止通过红路灯

场景:构造需要条件 + 多协程协调 + 精确控制唤醒时机的机制。


import (
	"fmt"
	"sync"
	"time"
)

var (
	greenLight = false             // 红绿灯状态
	mu         = sync.Mutex{}      // 互斥锁
	cond       = sync.NewCond(&mu) // 条件变量
)

func car(id int) {
	cond.L.Lock()
	for !greenLight {
		fmt.Printf("🚗 车辆 %d 等待红灯...\n", id)
		cond.Wait()
	}
	fmt.Printf("✅ 车辆 %d 通过绿灯!\n", id)
	cond.L.Unlock()
}

func main() {
	// 启动5辆车(5个等待的 goroutine)
	for i := 1; i <= 5; i++ {
		go car(i)
	}

	// 等待红灯 3 秒
	time.Sleep(3 * time.Second)

	// 改变红绿灯状态为绿灯
	cond.L.Lock()
	fmt.Println("🟢 绿灯了!通知所有车辆通行!")
	greenLight = true
	cond.Broadcast() // 广播通知所有等待的 goroutine
	cond.L.Unlock()

	// 等待所有 goroutine 打印完
	time.Sleep(2 * time.Second)
}

输出:
🚗 车辆 5 等待红灯...
🚗 车辆 1 等待红灯...
🚗 车辆 2 等待红灯...
🚗 车辆 3 等待红灯...
🚗 车辆 4 等待红灯...
🟢 绿灯了通知所有车辆通行
 车辆 4 通过绿灯
 车辆 5 通过绿灯
 车辆 1 通过绿灯
 车辆 2 通过绿灯
 车辆 3 通过绿灯

四、最佳实践

  • 避免goroutine泄漏:用 context 或关闭channel确保goroutine退出。

  • Channel规范:发送方关闭channel,接收方用 for range。

  • 锁粒度:最小化锁范围,读多写少用 RWMutex。

  • 性能优化:用带缓冲channel、atomic、或 sync.Pool。