本文主要是鸟窝《深入理解go并发编程》中的读书速记以及一些并发库的使用例子集合
常用的并发库使用
sourcegraph conc
waitgroup
创建一组协程并等待完成:
标准库
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// crashes on panic!
doSomething()
}()
}
wg.Wait()
}
conc
func main() {
var wg conc.WaitGroup
for i := 0; i < 10; i++ {
wg.Go(doSomething)
}
wg.Wait()
}
下面是一个官网博客的例子:
写一个函数,给定用户的名字,通过网络获取姓氏
func fetchLastName(ctx context.Context, firstName string) (string, error) { req, err := http.NewRequestWithContext( ctx, "GET", fmt.Sprintf("https://myexampleapp.com/users/%s/last_name", firstName), nil, ) if err != nil { return "", err } resp, err := http.DefaultClient.Do(req) if err != nil { return "", err } b, err := io.ReadAll(resp.Body) return string(b), err }
pool
处理静态goroutine池中每个steam的元素
标准库
func process(stream chan int) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for elem := range stream {
handle(elem)
}
}()
}
wg.Wait()
}
conc
func process(stream chan int) {
p := pool.New().WithMaxGoroutines(10)
for elem := range stream {
elem := elem
p.Go(func() {
handle(elem)
})
}
p.Wait()
}
书接上面官网的例子:
如果我们有一个名字列表,并且希望有效地获取每个名字的姓氏,我们可以使用
conc
的pool
来完成此操作,如下所示。func fetchLastNames_pool(ctx context.Context, firstNames []string) ([]string, error) { p := pool.NewWithResults[string]().WithContext(ctx) for _, firstName := range firstNames { firstName := firstName p.Go(func(ctx context.Context) (string, error) { return fetchLastName(ctx, firstName) }) } return p.Wait() }
iter
处理静态gouroutine池中切片的每个元素
标准库
func process(values []int) {
feeder := make(chan int, 8)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for elem := range feeder {
handle(elem)
}
}()
}
for _, value := range values {
feeder <- value
}
close(feeder)
wg.Wait()
}
conc
func process(values []int) {
iter.ForEach(values, handle)
}
还是说到上面官网获取姓氏的那个例子:
使用iter则可以这样写:
func fetchLastNames2(ctx context.Context, firstNames []string) ([]string, error) { return iter.MapErr(firstNames, func(firstName *string) (string, error) { return fetchLastName(ctx, *firstName) }) }
对切片进行map操作
标准库
func concMap(
input []int,
f func(int) int,
) []int {
res := make([]int, len(input))
var idx atomic.Int64
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
i := int(idx.Add(1) - 1)
if i >= len(input) {
return
}
res[i] = f(input[i])
}
}()
}
wg.Wait()
return res
}
conc
func concMap(
input []int,
f func(*int) int,
) []int {
return iter.Map(input, f)
}
stream
并发处理有序流
标准库
func mapStream(
in chan int,
out chan int,
f func(int) int,
) {
tasks := make(chan func())
taskResults := make(chan chan int)
// Worker goroutines
var workerWg sync.WaitGroup
for i := 0; i < 10; i++ {
workerWg.Add(1)
go func() {
defer workerWg.Done()
for task := range tasks {
task()
}
}()
}
// Ordered reader goroutines
var readerWg sync.WaitGroup
readerWg.Add(1)
go func() {
defer readerWg.Done()
for result := range taskResults {
item := <-result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make(chan int, 1)
taskResults <- resultCh
tasks <- func() {
resultCh <- f(elem)
}
}
// We've exhausted input.
// Wait for everything to finish
close(tasks)
workerWg.Wait()
close(taskResults)
readerWg.Wait()
}
conc
func mapStream(
in chan int,
out chan int,
f func(int) int,
) {
s := stream.New().WithMaxGoroutines(10)
for elem := range in {
elem := elem
s.Go(func() stream.Callback {
res := f(elem)
return func() { out <- res }
})
}
s.Wait()
}
再举一个官网文章的例子:
在 Sourcegraph,我们对有序流进行大量并行处理。在搜索大量代码时,我们通常会得到需要后处理的结果流。流中的每个结果可能需要网络请求,例如,查找存储库上的权限或获取搜索结果的完整文件内容。
为此,我们始终希望:
- 并行执行网络请求。
- 尽快向用户展示结果。
- 保持流的顺序(因为我们已经对结果进行了排名)。
同时满足所有这三个要求是很困难的,因此我在编写
conc
的 Stream 包时的目标之一是尽可能多地抽象出该工作流程的复杂性。现在我可以使用类似于下面示例的代码一次获取多个文件的内容。这样可以高效、安全地获取每个文件的内容,同时仍然保持流的原始顺序。
func streamFileContents(ctx context.Context, fileNames <-chan string, fileContents chan<- string) { s := stream.New() for fileName := range fileNames { fileName := fileName s.Go(func() stream.Callback { contents := fetchFileContents(ctx, fileName) return func() { fileContents <- contents } }) } s.Wait() }