Go语言主要通过 goroutine 和 channel 实现高效并发,辅以 sync 包和 context 提供同步和任务管理。本文总结Go的所有并发编程技巧,并通过案例说明。
一、核心概念
- Goroutine:轻量级线程,由Go运行时管理,创建成本低(初始栈2KB,动态扩展)。
- Channel:goroutine间通信机制,支持阻塞式数据传递,避免数据竞争。
- Sync包:提供锁、等待组等原语,保护共享资源。
- Context:管理goroutine生命周期,支持取消和超时。
- Select:处理多channel操作,支持非阻塞和超时。
- 并发模式:如生产者-消费者、工作者池、管道等。
二、并发编程技巧
1. 使用Goroutine实现轻量并发
- 技巧:用
go关键字启动goroutine,需配合同步机制。 - 注意:goroutine执行顺序不定。
2. 使用Channel进行安全通信
- 技巧:优先用channel传递数据,关闭channel通知结束。
- 规则:发送方关闭channel,接收方用
for range。
3. 使用Select处理多Channel
- 技巧:用
select实现多路复用或超时控制。
4. 使用Context管理任务
- 技巧:通过
context实现取消和超时,确保资源清理。
5. 使用Sync包保护共享资源
- 技巧:
sync.Mutex/sync.RWMutex:保护临界区。sync.WaitGroup:等待goroutine完成。sync.ErrGroup: 并行任务,捕获第一个错误。sync.Once:确保单次执行。sync.Pool:对象复用,降低GC压力。sync.Cond: 构造条件变量,后续通过 .Broadcast() 广播通知所有等待的 goroutine
6. 原子操作优化性能
- 技巧:用
sync/atomic实现无锁操作,适合计数器等。
7. 并发模式
- 工作者池:限制goroutine数量。
- 扇入扇出:并行处理后汇总。
- 管道:channel串联处理阶段。
三、案例分析
案例1:生产者-消费者
场景:一个生产者生成数据,多个消费者并行处理。
package main
import (
"fmt"
"time"
)
func producer(ch chan<- int) {
for i := 1; i <= 5; i++ {
ch <- i
time.Sleep(time.Millisecond * 500)
}
close(ch)
}
func consumer(id int, ch <-chan int) {
for num := range ch {
fmt.Printf("Consumer %d received: %d\n", id, num)
}
}
func main() {
ch := make(chan int, 2)
go producer(ch)
go consumer(1, ch)
go consumer(2, ch)
time.Sleep(time.Second * 3)
}
输出:
Consumer 1 received: 1
Consumer 2 received: 2
Consumer 1 received: 3
Consumer 2 received: 4
Consumer 1 received: 5
案例2:Select实现超时
场景:任务超时退出。
package main
import (
"fmt"
"time"
)
func process(ch chan string, data string) {
time.Sleep(time.Second * 2)
ch <- data
}
func main() {
ch := make(chan string)
go process(ch, "result")
select {
case res := <-ch:
fmt.Println("Received:", res)
case <-time.After(time.Second * 1):
fmt.Println("Timeout")
}
}
输出:
Timeout
案例3:Context实现任务取消
场景:HTTP请求超时取消。
package main
import (
"context"
"fmt"
"time"
)
func longRunningTask(ctx context.Context, ch chan<- string) {
select {
case <-time.After(time.Second * 2):
ch <- "Task completed"
case <-ctx.Done():
fmt.Println("Task cancelled:", ctx.Err())
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
ch := make(chan string)
go longRunningTask(ctx, ch)
select {
case res := <-ch:
fmt.Println(res)
case <-ctx.Done():
fmt.Println("Main:", ctx.Err())
}
}
输出:
Task cancelled: context deadline exceeded
Main: context deadline exceeded
案例4:工作者池与WaitGroup
场景:并行处理任务,等待完成。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, tasks <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
fmt.Printf("Worker %d processing task %d\n", id, task)
time.Sleep(time.Millisecond * 500)
}
}
func main() {
tasks := make(chan int, 10)
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, tasks, &wg)
}
for i := 1; i <= 5; i++ {
tasks <- i
}
close(tasks)
wg.Wait()
fmt.Println("All tasks completed")
}
输出:
Worker 1 processing task 1
Worker 2 processing task 2
Worker 3 processing task 3
Worker 1 processing task 4
Worker 2 processing task 5
All tasks completed
案例5:Mutex保护共享资源
场景:多goroutine安全累加计数器。
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
count int
}
func (c *Counter) Inc() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
func main() {
counter := Counter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
counter.Inc()
wg.Done()
}()
}
wg.Wait()
fmt.Println("Final count:", counter.Value())
}
输出:
Final count: 1000
案例6:原子操作优化计数
场景:高性能计数器。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var count int32
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
atomic.AddInt32(&count, 1)
wg.Done()
}()
}
wg.Wait()
fmt.Println("Final count:", atomic.LoadInt32(&count))
}
输出:
Final count: 1000
案例7:errgroup管理goroutine和错误
场景:并行任务,捕获第一个错误。
package main
import (
"fmt"
"golang.org/x/sync/errgroup"
)
func main() {
var g errgroup.Group
g.Go(func() error {
return fmt.Errorf("task 1 failed")
})
g.Go(func() error {
return nil
})
if err := g.Wait(); err != nil {
fmt.Println("Error:", err)
} else {
fmt.Println("All tasks succeeded")
}
}
输出:
Error: task 1 failed
案例8:使用 sync.cond 模拟车辆停止通过红路灯
场景:构造需要条件 + 多协程协调 + 精确控制唤醒时机的机制。
import (
"fmt"
"sync"
"time"
)
var (
greenLight = false // 红绿灯状态
mu = sync.Mutex{} // 互斥锁
cond = sync.NewCond(&mu) // 条件变量
)
func car(id int) {
cond.L.Lock()
for !greenLight {
fmt.Printf("🚗 车辆 %d 等待红灯...\n", id)
cond.Wait()
}
fmt.Printf("✅ 车辆 %d 通过绿灯!\n", id)
cond.L.Unlock()
}
func main() {
// 启动5辆车(5个等待的 goroutine)
for i := 1; i <= 5; i++ {
go car(i)
}
// 等待红灯 3 秒
time.Sleep(3 * time.Second)
// 改变红绿灯状态为绿灯
cond.L.Lock()
fmt.Println("🟢 绿灯了!通知所有车辆通行!")
greenLight = true
cond.Broadcast() // 广播通知所有等待的 goroutine
cond.L.Unlock()
// 等待所有 goroutine 打印完
time.Sleep(2 * time.Second)
}
输出:
🚗 车辆 5 等待红灯...
🚗 车辆 1 等待红灯...
🚗 车辆 2 等待红灯...
🚗 车辆 3 等待红灯...
🚗 车辆 4 等待红灯...
🟢 绿灯了!通知所有车辆通行!
✅ 车辆 4 通过绿灯!
✅ 车辆 5 通过绿灯!
✅ 车辆 1 通过绿灯!
✅ 车辆 2 通过绿灯!
✅ 车辆 3 通过绿灯!
四、最佳实践
-
避免goroutine泄漏:用 context 或关闭channel确保goroutine退出。
-
Channel规范:发送方关闭channel,接收方用 for range。
-
锁粒度:最小化锁范围,读多写少用 RWMutex。
-
性能优化:用带缓冲channel、atomic、或 sync.Pool。