我目前正在研究我的学士论文,基本上我的任务是优化Go中的给定代码,即尽可能快地运行.首先,我优化了串行功能,然后尝试通过goroutines引入并行性.在互联网上进行研究之后,我现在了解了并发性和并行性之间的差异,这要归功于talks.golang的以下幻灯片.我参观了一些并行编程课程,我们在pthread / openmp的帮助下并行化了ac / c代码,因此我尝试将这些范例应用于走.也就是说,在这种特殊情况下,我正在优化一个函数,它计算长度为len:= n(window_size-1)的片段的moving average(它等于9393或10175),因此我们有n个窗口我们计算相应的算术平均值并在输出切片中正确保存.
请注意,此任务本质上是令人尴尬的并行.
我的优化尝试和结果
在moving_avg_concurrent2中,我将切片分成num_goroutines较小的片段,然后用一个goroutine运行.这个函数用一个goroutine执行,出于某种原因(虽然还没找到原因,但我们在这里得到切线),比moving_avg_serial4好,但是有多个goroutine它开始表现比moving_avg_serial4差.
在moving_avg_concurrent3中,我采用了master / worker范例.使用一个goroutine时,性能比moving_avg_serial4差.在这里,我们至少在增加num_goroutines时获得了更好的表现,但仍然不比moving_avg_serial4好.
为了比较moving_avg_serial4,moving_avg_concurrent2和moving_avg_concurrent3的性能,我写了一个基准测试,并将结果列表:
fct & num_goroutines | timing in ns/op | percentage
---------------------------------------------------------------------
serial4 | 4357893 | 100.00%
concur2_1 | 5174818 | 118.75%
concur2_4 | 9986386 | 229.16%
concur2_8 | 18973443 | 435.38%
concur2_32 | 75602438 | 1734.84%
concur3_1 | 32423150 | 744.01%
concur3_4 | 21083897 | 483.81%
concur3_8 | 16427430 | 376.96%
concur3_32 | 15157314 | 347.81%
题
因为如上所述,这个问题令人尴尬地平行,我期待看到性能的巨大提升但事实并非如此.
为什么moving_avg_concurrent2根本没有缩放?
为什么moving_avg_concurrent3比moving_avg_serial4慢得多?
我知道goroutines很便宜,但仍然不是免费的,但这是否有可能产生这么大的开销,以至于我们甚至比moving_avg_serial4慢?
码
功能:
// returns a slice containing the moving average of the input (given, i.e. not optimised)
func moving_avg_serial(input []float64, window_size int) []float64 {
first_time := true
var output = make([]float64, len(input))
if len(input) > 0 {
var buffer = make([]float64, window_size)
// initialise buffer with NaN
for i := range buffer {
buffer[i] = math.NaN()
}
for i, val := range input {
old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
buffer[int((math.Mod(float64(i), float64(window_size))))] = val
if !NaN_in_slice(buffer) && first_time {
sum := 0.0
for _, entry := range buffer {
sum += entry
}
output[i] = sum / float64(window_size)
first_time = false
} else if i > 0 && !math.IsNaN(output[i-1]) && !NaN_in_slice(buffer) {
output[i] = output[i-1] + (val-old_val)/float64(window_size) // solution without loop
} else {
output[i] = math.NaN()
}
}
} else { // empty input
fmt.Println("moving_avg is panicking!")
panic(fmt.Sprintf("%v", input))
}
return output
}
// returns a slice containing the moving average of the input
// reordering the control structures to exploid the short-circuit evaluation
func moving_avg_serial4(input []float64, window_size int) []float64 {
first_time := true
var output = make([]float64, len(input))
if len(input) > 0 {
var buffer = make([]float64, window_size)
// initialise buffer with NaN
for i := range buffer {
buffer[i] = math.NaN()
}
for i := range input {
// fmt.Printf("in mvg_avg4: i=%v\n", i)
old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
buffer[int((math.Mod(float64(i), float64(window_size))))] = input[i]
if first_time && !NaN_in_slice(buffer) {
sum := 0.0
for j := range buffer {
sum += buffer[j]
}
output[i] = sum / float64(window_size)
first_time = false
} else if i > 0 && !math.IsNaN(output[i-1]) /* && !NaN_in_slice(buffer)*/ {
output[i] = output[i-1] + (input[i]-old_val)/float64(window_size) // solution without loop
} else {
output[i] = math.NaN()
}
}
} else { // empty input
fmt.Println("moving_avg is panicking!")
panic(fmt.Sprintf("%v", input))
}
return output
}
// returns a slice containing the moving average of the input
// splitting up slice into smaller pieces for the goroutines but without using the serial version, i.e. we only have NaN's in the beginning, thus hope to reduce some overhead
// still does not scale (decreasing performance with increasing size and num_goroutines)
func moving_avg_concurrent2(input []float64, window_size, num_goroutines int) []float64 {
var output = make([]float64, window_size-1, len(input))
for i := 0; i < window_size-1; i++ {
output[i] = math.NaN()
}
if len(input) > 0 {
num_items := len(input) - (window_size - 1)
var barrier_wg sync.WaitGroup
n := num_items / num_goroutines
go_avg := make([][]float64, num_goroutines)
for i := 0; i < num_goroutines; i++ {
go_avg[i] = make([]float64, 0, num_goroutines)
}
for i := 0; i < num_goroutines; i++ {
barrier_wg.Add(1)
go func(go_id int) {
defer barrier_wg.Done()
// computing boundaries
var start, stop int
start = go_id*int(n) + (window_size - 1) // starting index
// ending index
if go_id != (num_goroutines - 1) {
stop = start + n // Ending index
} else {
stop = num_items + (window_size - 1) // Ending index
}
loc_avg := moving_avg_serial4(input[start-(window_size-1):stop], window_size)
loc_avg = make([]float64, stop-start)
current_sum := 0.0
for i := start - (window_size - 1); i < start+1; i++ {
current_sum += input[i]
}
loc_avg[0] = current_sum / float64(window_size)
idx := 1
for i := start + 1; i < stop; i++ {
loc_avg[idx] = loc_avg[idx-1] + (input[i]-input[i-(window_size)])/float64(window_size)
idx++
}
go_avg[go_id] = append(go_avg[go_id], loc_avg...)
}(i)
}
barrier_wg.Wait()
for i := 0; i < num_goroutines; i++ {
output = append(output, go_avg[i]...)
}
} else { // empty input
fmt.Println("moving_avg is panicking!")
panic(fmt.Sprintf("%v", input))
}
return output
}
// returns a slice containing the moving average of the input
// change of paradigm, we opt for a master worker pattern and spawn all windows which each will be computed by a goroutine
func compute_window_avg(input, output []float64, start, end int) {
sum := 0.0
size := end - start
for _, val := range input[start:end] {
sum += val
}
output[end-1] = sum / float64(size)
}
func moving_avg_concurrent3(input []float64, window_size, num_goroutines int) []float64 {
var output = make([]float64, window_size-1, len(input))
for i := 0; i < window_size-1; i++ {
output[i] = math.NaN()
}
if len(input) > 0 {
num_windows := len(input) - (window_size - 1)
var output = make([]float64, len(input))
for i := 0; i < window_size-1; i++ {
output[i] = math.NaN()
}
pending := make(chan *Work)
done := make(chan *Work)
// creating work
go func() {
for i := 0; i < num_windows; i++ {
pending <- NewWork(compute_window_avg, input, output, i, i+window_size)
}
}()
// start goroutines which work through pending till there is nothing left
for i := 0; i < num_goroutines; i++ {
go func() {
Worker(pending, done)
}()
}
// wait till every work is done
for i := 0; i < num_windows; i++ {
<-done
}
return output
} else { // empty input
fmt.Println("moving_avg is panicking!")
panic(fmt.Sprintf("%v", input))
}
return output
}
基准:
//############### BENCHMARKS ###############
var import_data_res11 []float64
func benchmarkMoving_avg_serial(b *testing.B, window int) {
var r []float64
for n := 0; n < b.N; n++ {
r = moving_avg_serial(BackTest_res.F["Trading DrawDowns"], window)
}
import_data_res11 = r
}
var import_data_res14 []float64
func benchmarkMoving_avg_serial4(b *testing.B, window int) {
var r []float64
for n := 0; n < b.N; n++ {
r = moving_avg_serial4(BackTest_res.F["Trading DrawDowns"], window)
}
import_data_res14 = r
}
var import_data_res16 []float64
func benchmarkMoving_avg_concurrent2(b *testing.B, window, num_goroutines int) {
var r []float64
for n := 0; n < b.N; n++ {
r = moving_avg_concurrent2(BackTest_res.F["Trading DrawDowns"], window, num_goroutines)
}
import_data_res16 = r
}
var import_data_res17 []float64
func benchmarkMoving_avg_concurrent3(b *testing.B, window, num_goroutines int) {
var r []float64
for n := 0; n < b.N; n++ {
r = moving_avg_concurrent3(BackTest_res.F["Trading DrawDowns"], window, num_goroutines)
}
import_data_res17 = r
}
func BenchmarkMoving_avg_serial_261x10(b *testing.B) {
benchmarkMoving_avg_serial(b, 261*10)
}
func BenchmarkMoving_avg_serial4_261x10(b *testing.B) {
benchmarkMoving_avg_serial4(b, 261*10)
}
func BenchmarkMoving_avg_concurrent2_261x10_1(b *testing.B) {
benchmarkMoving_avg_concurrent2(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent2_261x10_8(b *testing.B) {
benchmarkMoving_avg_concurrent2(b, 261*10, 8)
}
func BenchmarkMoving_avg_concurrent3_261x10_1(b *testing.B) {
benchmarkMoving_avg_concurrent3(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent3_261x10_8(b *testing.B) {
benchmarkMoving_avg_concurrent3(b, 261*10, 8)
}
//############### BENCHMARKS end ###############
备注:
这是我的第一篇文章,我还在学习,所以任何建设性的批评也是受欢迎的.
为什么?
一个“错误的”SLOC可能会使性能超过37%
或者可以提高性能,花费不到基线处理时间的-57%
51.151µs on MA(200) [10000]float64 ~ 22.017µs on MA(200) [10000]int 70.325µs on MA(200) [10000]float64
为什么[] int-s?
您可以在上面看到它 – 这是HPC / fintech高效子[我们]处理策略的基础和黄油(我们仍然只谈[SERIAL]流程调度).
This one may test on any scale – but rather test first ( here ) your own implementations, on the very the same scale – MA(200) [10000]float64 setup – and post your baseline durations in [us] to view the initial process performance and to compare apples-to-apples, having the posted 51.2 [us] threshold to compare against.
接下来是更难的部分:
事实#1:这项任务并不是令人尴尬的平行
是的,人们可以去实施移动平均线计算,这样它确实可以使用一些故意灌输的“正义” – [并行]处理方法(无论是由于某种错误,某些权威的“建议”)来处理大量数据. “,专业盲目或仅仅来自双重苏格拉底公平的无知”,这显然并不意味着存在于移动平均数学公式中的卷积流处理的本质已经忘记了纯粹的[SERIAL]过程,只是由于试图强制执行它,在某种程度的“正义” – [CONCURRENT]处理中进行计算.
(顺便说一句,硬计算机科学家和双域书呆子也会在这里反对,Go语言是设计使用最好的Rob Pike的技能,有一个并发协程的框架,而不是任何真正的[PARALLEL]进程调度,甚至虽然Hoare的CSP工具,在语言概念中可用,可能会添加一些盐和胡椒,并引入一种停止块类型的进程间通信工具,这将阻止“只是” – [CONCURRENT]代码段进入一些硬连线的CSP-p2p – 同步.)
事实#2:仅在结束时分发(对于任何类型的加速)
在[SERIAL]中表现不佳并没有设定任何标准.在单线程中进行合理数量的性能调整,只有一个人可以从分发中受益(仍然需要支付额外的连续成本,这使得Amdahl Law(相当于Overhead-strict-Amdahl Law)进入游戏).
如果可以引入如此低水平的额外设置开销并仍然实现任何显着的并行性,则扩展到处理的非[SEQ]部分,那里只有有机会增加过程的有效性能.
除了获得这一点并不困难之外,所以总是将纯 – [SEQ]与非[SEQ] / N [PAR] _过程理论上的开销加速之间的潜在权衡进行比较,其中一个是将支付所有附加 – [SEQ] – 开销的总和的费用,因此当且仅当:
( pure-[SEQ]_processing [ns]
+ add-on-[SEQ]-setup-overheads [ns]
+ ( non-[SEQ]_processing [ns] / N[PAR]_processes )
) << ( pure-[SEQ]_processing [ns]
+ ( non-[SEQ]_processing [ns] / 1 )
)
没有这种喷气式战斗机的优势,包括盈余高度和太阳落后于你,永远不会尝试进行任何类型的HPC /并行化尝试 – 他们永远不会为自己买单而不是显着<<比智能[SEQ]过程更好.
Epilogue: on overhead-strict Amdahl’s Law interactive experiment UI
一个动画价值百万字.
An interactive animation even better:
所以,
假设一个被测过程,它有一个[SERIAL]和[PARALLEL]部分的过程表.
令p为进程持续时间的[PARALLEL]分数〜(0.0 .. 1.0),因此[SERIAL]部分的持续时间不会超过(1-p),对吧?
因此,让我们从这样的测试用例开始交互式实验,其中p == 1.0,意味着所有这样的过程持续时间仅用于[PARALLEL]部分,以及过程流的初始序列和终止部分(主要始终是[SERIAL])具有零持续时间((1-p)== 0.)
假设系统没有特别的魔力,因此需要花费一些实际步骤来对每个[PARALLEL]部分进行初始化,以便在不同的处理器((1),2,..,N)上运行它,所以让我们如果要求重新组织流程流并编组分发所有必要的指令和数据,则添加一些开销,以便现在可以在N个处理器上并行启动和运行预期的流程.
这些成本被称为o(这里最初假设为简单,对于N来说只是常数和不变,实际情况并非总是如此,在硅片/ NUMA /分布式基础设施上).
通过单击上面的Epilogue标题,可以打开一个交互式环境,并且可以自由进行实验.
用p == 1.&& o == 0.&& N> 1性能急剧增长到目前可实现的[PARALLEL] – 硬件O / S限制仍然单一的O / S代码执行(MPI-和类似的工作单元的类似模式分布仍然没有额外的分配成本(其中)我们必须立即添加大量[ms],而我们迄今为止最好的[SERIAL]实现显然完成了整个工作,而不仅仅是~22.1 [us])).
但除了这种人为乐观的情况之外,这项工作看起来并不那么便宜,无法实现并行化.
>尝试不是零,但只是约0.01%的设置开销成本o,并且该行开始显示开销感知缩放的一些非常不同的性质,即使是最极端的[PARALLEL]情况(仍然p = = 1.0),并且在最初的超理想线性加速情况的近一半附近具有潜在的加速.
>现在,将p转向更接近现实的东西,某些人工设定的地方比最初的超理想情况= = 1.00 – > {0.99,0.98,0.95}和……宾果,这是现实,应该测试和预验证过程调度.
那是什么意思?
例如,如果开销(启动最终加入协同程序池)将花费超过实际[PARALLEL]处理部分持续时间的约0.1%,则不会有更大的4倍加速(大约1/4的原始持续时间)对于5个协程(具有p~0.95),对于20个协同程序不超过10倍(持续时间快10倍)(假设系统具有5个CPU核心,分别为20个CPU核心)免费且可用且准备就绪(最好具有O / S级CPU核心亲和性映射进程/线程),以便在其整个生命周期内不间断地服务所有这些协同程序,从而实现任何高于预期的加速.
没有这么多的硬件资源可用于所有那些用于实现进程调度的[PARALLEL]部分的任务单元,阻塞/等待状态将引入额外的绝对等待状态和最终性能将这些新的[[SERIAL] – 阻塞/等待部分添加到整个过程持续时间中,并且最初希望的加速突然停止存在并且性能因子远低于<<< 1.00(意味着有效运行时间是由于阻塞状态比非并行化的 - [SERIAL]工作流程更慢). 对于新的敏锐的实验者来说,这可能听起来很复杂,但是我们可能会反过来看.鉴于整个分发过程,已知的[PARALLEL]任务池已知不会比大约10 [us]更短,严格限制图表显示,需要至少约1000 x 10 [美国] [PARALLEL]部分内的非阻塞计算密集处理,以免破坏并行处理的效率. 如果没有足够的“肥胖”加工,那么间接成本(显着高于上述阈值~0.1%)就会残酷地破坏成功并行化处理的净效率(但是已经在这样做了设置的不合理的高相对成本与任何N处理器的有限净效应,如可用的实时图所示. 分布式计算书呆子并不奇怪,开销o还伴随着额外的依赖关系 – 在N(流程越多,分配工作包的工作量越多),编组数据-BLOBs的大小( BLOB越大,MEM- / IO设备保持阻塞的时间越长,在服务下一个进程以在每个目标第2..N个接收进程的这种设备/资源上接收分布式BLOB之前,在避免/ CSP上-signalled,通道介导的进程间协调(称为额外的每事件阻塞,进一步减少p,并进一步低于最终的理想1). 因此,现实世界的现实与最初的理想化,美好和有前途的p == 1.0,(1 – p)== 0.0和o == 0.0相当远. 从一开始就很明显,尝试击败22.1 [us] [SERIAL]阈值,而不是试图击败它,而越来越糟,如果使用已经表现不佳的方法进行实际开销和缩放的[PARALLEL] ,一点都不帮助.
