Go 语言的一大卖点可以说是并发编程。作为一门非常年轻的语言(诞生于2006年),在Google的培育下,为了充分利用多核机器资源的并发优势,从底层原生支持并发。
实现并发很“简单”Go 语言通过协程(Goroutine)实现并发。启动一个协程只需要在函数调用前加上关键字go
即可,协程将会以异步方式执行,不会阻塞。
举个例子,我们实现一个商城商品的接口,这个接口会返回商品价格和下单量,这两部分数据由两个不同的RPC服务提供。最简单的做法是同步去分别调用:
func GetItemPrice(itemId int64) int64 {
// 模拟Item RPC 调用
time.Sleep(1 * time.Second)
return 100
}
func GetOrderCount(itemId int64) int64 {
// 模拟Order RPC 调用
time.Sleep(1 * time.Second)
return 200
}
type ItemInfo struct {
ItemPrice int64
OrderCount int64
}
func GetItem(itemId int64) ItemInfo {
// 内部实现是 item rpc 调用
itemPrice := GetItemPrice(itemId)
// 内部实现是 order rpc 调用
orderCount := GetOrderCount(itemId)
return ItemInfo{
ItemPrice: itemPrice,
OrderCount: orderCount,
}
}
上面这种实现的缺点显而易见:第二个RPC请求需要等到第一个Rpc返回后再开始调用,这个接口的耗时与调用的RPC接口量成正比。当然这是一个错位的示范,我们可以使用协程来解决这个问题:
func GetItemByGoroutine(itemId int64) ItemInfo {
var itemPrice, orderCount int64
// 协程方式执行
go func() {
itemPrice = GetItemPrice(itemId)
}()
go func() {
orderCount = GetOrderCount(itemId)
}()
// 等待协程执行完
time.Sleep(1050 * time.Millisecond)
return ItemInfo{
ItemPrice: itemPrice,
OrderCount: orderCount,
}
}
我们来比较一下两个的区别:
startAt := time.Now().UnixMilli()
itemInfo := GetItem(1)
endAt := time.Now().UnixMilli()
fmt.Printf("get itemInfo: %v, cost: %dms\n", itemInfo, endAt-startAt)
startAt = time.Now().UnixMilli()
itemInfo = GetItemByGoroutine(1)
endAt = time.Now().UnixMilli()
fmt.Printf("get itemInfo by groutine: %v, cost: %dms\n", itemInfo, endAt-startAt)
/*
output:
get itemInfo: {100 200}, cost: 2000ms
get itemInfo by groutine: {100 200}, cost: 1050ms
*/
可以看到,通过使用协程,我们可以并发地执行两个请求,缩短整体的执行时间。但是现在的实现并不优雅,因为协程是异步执行,不阻塞主流程的,我加了一行 time.Sleep(1050 * time.Millisecond)
来强制主流程等待1.05s,这时我认为两个协程应该都执行完了,然后去做后面的数据组装。这里 sleep 的时间有点异想天开了,因为我们设想每个rpc接口的耗时都是1s,而实际情况肯定不会这么幸运:如果突发的网络延迟到时接口响应时间变成了1.5s,那岂不是GG。如果每个协程能够告诉我们他已经执行完成了,我在他们都完成时再去打包数据不就好了?
这也就引出了协程间是如何通信的?,在 Go 语言里,协程通过 channel 实现通信,channel 是 Go 语言特有的一种数据类型,声明方式很简单
a := make(chan int)
b := make(chan bool)
c := make(chan map[int]string)
向 channel 中写入数据的类型必须和 channel 声明时的类型一样,写入方式:
a <- 1
b <- true
c <- map[int]string{1: "1"}
读取方式为:
aValue := <- a
bValue := <- b
cVaule := <- c
如果当前 channel 内没有数据,会一直阻塞在这里。通过这个特性,我们就可以实现到上面讲的:让协程告诉我们它是否执行结束:
func GetItemByGoroutineWithChan(itemId int64) ItemInfo {
var itemPrice, orderCount int64
itemCallDone := make(chan bool)
orderCallDone := make(chan bool)
go func() {
itemPrice = GetItemPrice(itemId)
itemCallDone <- true
}()
go func() {
orderCount = GetOrderCount(itemId)
orderCallDone <- true
}()
<-itemCallDone
<-orderCallDone
return ItemInfo{
ItemPrice: itemPrice,
OrderCount: orderCount,
}
}
来看一下效果,果然使用了 channel 效果更好
get itemInfo: {100 200}, cost: 2000ms
get itemInfo by groutine: {100 200}, cost: 1051ms
get itemInfo by groutine with chan: {100 200}, cost: 1001ms
假设因为网络原因,订单服务的接口延迟变成了3s,商品服务的接口延迟变成了2s
func GetItemPrice(itemId int64) int64 {
// 模拟Item RPC 调用延迟
time.Sleep(2 * time.Second)
return 100
}
func GetOrderCount(itemId int64) int64 {
// 模拟Order RPC 调用延迟
time.Sleep(3 * time.Second)
return 200
}
再看一下效果,同步调用方式的耗时变成了5秒,只使用协程已无法取到正确数据,通过 channel 方式可以取到数据,并且耗时3秒。
get itemInfo: {100 200}, cost: 5000ms
get itemInfo by groutine: {0 0}, cost: 1050ms
get itemInfo by groutine with chan: {100 200}, cost: 3001ms
到时间了,准备交卷
实际上,我们的接口响应是有一定时间要求的,比如:2s,现象一下:当我们看到网页一直在转圈时,我们会怎么做?刷新一下,重新发起请求,第一次的请求在3s时返回数据就没有意义了。所以,我们需要对接口调用有一定的超时控制,如果超过约定时间,服务返回兜底数据或直接报错(这依赖于业务上决策)。
因此,我们要在这个订单接口实现超时控制,如果到了规定时间,我们就直接返回,不再等待还没完成的请求了:
func GetItemWithTimeout(itemId int64, timeOut int64) ItemInfo {
var itemPrice, orderCount int64
itemCallDone := make(chan bool)
orderCallDone := make(chan bool)
go func() {
itemPrice = GetItemPrice(itemId)
itemCallDone <- true
}()
go func() {
orderCount = GetOrderCount(itemId)
orderCallDone <- true
}()
// 这是一个闹钟,到点后 channel time.C 会有数据
ticker := time.NewTicker(time.Duration(timeOut) * time.Second)
defer ticker.Stop()
// 计数有没有完成
doneCnt := 0
// 标记是否超时
isTimeOut := false
for {
select {
case <-itemCallDone:
doneCnt += 1
break
case <-orderCallDone:
doneCnt += 1
break
case <-ticker.C:
isTimeOut = true
break
}
if doneCnt == 2 {
break
}
if isTimeOut {
fmt.Printf("timeout\n")
break
}
}
return ItemInfo{
ItemPrice: itemPrice,
OrderCount: orderCount,
}
}
我们原来运行对比一下:
func main() {
startAt := time.Now().UnixMilli()
itemInfo := GetItem(1)
endAt := time.Now().UnixMilli()
fmt.Printf("get itemInfo: %v, cost: %dms\n", itemInfo, endAt-startAt)
startAt = time.Now().UnixMilli()
itemInfo = GetItemByGoroutine(1)
endAt = time.Now().UnixMilli()
fmt.Printf("get itemInfo by groutine: %v, cost: %dms\n", itemInfo, endAt-startAt)
startAt = time.Now().UnixMilli()
itemInfo = GetItemByGoroutineWithChan(1)
endAt = time.Now().UnixMilli()
fmt.Printf("get itemInfo by groutine with chan: %v, cost: %dms\n", itemInfo, endAt-startAt)
startAt = time.Now().UnixMilli()
itemInfo = GetItemWithTimeout(1, 2)
endAt = time.Now().UnixMilli()
fmt.Printf("get itemInfo by groutine with timeout: %v, cost: %dms\n", itemInfo, endAt-startAt)
}
```output
get itemInfo: {100 200}, cost: 5001ms
get itemInfo by groutine: {0 0}, cost: 1051ms
get itemInfo by groutine with chan: {100 200}, cost: 3001ms
timeout
get itemInfo by groutine with timeout: {100 0}, cost: 2001ms
订单服务的接口延迟是3s,商品服务的接口延迟是2s。函数传入的超时时间是2s,所以只取到了商品服务的接口数据后就返回了。
这就是一个简单的并发编程模式,相似的场景都可以套用。
等等,还有坑通过协程的并发编程好是好,但它还是有坑的,使用是要多加注意
- 避免死锁
使用 channel 通信时,特别是使用无缓冲区类型的 channel 时,一定要主要注意:
- 千万不要自己写完自己读
func main() {
ch := make(chan int, 0)
ch <- 666
x := <- ch
fmt.Println(x)
}
- 确认协程可以执行的起来,比如下面的协程就起不起来
func main() {
ch := make(chan int,0)
ch <- 666
go func() {
<- ch
}()
}
- channel 读写时,逻辑上不要发生死锁
func main() {
// 男孩要女孩先说再见,女孩要男孩先说
chBoy := make(chan int,0)
chGirl := make(chan int,0)
go func() {
select {
case <- chGirl:
chBoy<-888
}
}()
select {
case <- chBoy:
chGirl <- 888
}
}
- 避免协程泄露
主程序退出时,协程有可能还没退出,如果一直不退出,那就有可能是协程泄露了。常见的协程泄露常见有:
- 协程中因为 channel 阻塞,无法退出
func goroutineLeak() {
p := make(chan int)
go func() {
time.Sleep(2 * time.Second)
// 注意这里,当到达1秒后程序后自动返回,没有人去读channel的数据,就会一直阻塞到这里
p <- 1
fmt.Println("finished...")
}()
select {
case a := <-p:
fmt.Printf("got a %v", a)
case <-time.After(1 * time.Second):
return
}
}
func main() {
for i := 0; i <= 5; i++ {
goroutineLeak()
}
fmt.Println(runtime.NumGoroutine())
}
/* output:
7
*/
所以说,上面实现的方法是有可能出现协程泄露的,那应该如果处理呢,一种方式是使用有缓存channel,另一种是使用WaitGroup
来代替。
func GetItemWithTimeoutV2(itemId int64, timeOut int64) ItemInfo {
var itemPrice, orderCount int64
var wg sync.WaitGroup
wg.Add(2)
go func() {
itemPrice = GetItemPrice(itemId)
wg.Done()
fmt.Println("111")
}()
go func() {
orderCount = GetOrderCount(itemId)
wg.Done()
fmt.Println("222")
}()
ticker := time.NewTicker(time.Duration(timeOut) * time.Second)
defer ticker.Stop()
done := make(chan bool)
go func() {
defer close(done)
wg.Wait()
}()
select {
case <-done:
case <-ticker.C:
fmt.Println("timeout")
}
return ItemInfo{
ItemPrice: itemPrice,
OrderCount: orderCount,
}
}
运行效果:
func main() {
startAt := time.Now().UnixMilli()
itemInfo := GetItemWithTimeoutV2(1, 2)
endAt := time.Now().UnixMilli()
time.Sleep(time.Second * 2)
fmt.Printf("get itemInfo by groutine with tiemeoutv2: %v, cost: %dms, goroutines:%d\n", itemInfo, endAt-startAt, runtime.NumGoroutine())
}
/* output
timeout
111
222
get itemInfo by groutine with tiemeoutv2: {100 0}, cost: 2001ms, goroutines:1
/
【本文来自:台湾服务器 http://www.558idc.com/tw.html 复制请保留原URL】