type WaitGroup struct {
wg sync.WaitGroup
pc panics.Catcher}// Go spawns a new goroutine in the WaitGroup.
func (h *WaitGroup) Go(f func()){
h.wg.Add(1)
go func(){
defer h.wg.Done()
h.pc.Try(f)}()}
func (h *WaitGroup) Wait(){
h.wg.Wait()// Propagate a panic if we caught one from a child goroutine.
h.pc.Repanic()}
func (h *WaitGroup) WaitAndRecover()*panics.RecoveredPanic{
h.wg.Wait()// Return a recovered panic if we caught one from a child goroutine.
return h.pc.Recovered()}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
2. ForEach 与 Map
高级语言很多的基操,在 go 里面很奢侈,只能写很多繁琐代码。conc封装了泛型版本的 iterator 和 mapper
// Iterator is also safe for reuse and concurrent use.
type Iterator[T any] struct {// MaxGoroutines controls the maximum number of goroutines
// to use on this Iterator's methods. // // If unset, MaxGoroutines defaults to runtime.GOMAXPROCS(0). MaxGoroutines int}
// ForEachIdx is the same as ForEach except it also provides the
// index of the element to the callback.
func ForEachIdx[T any](input []T, f func(int,*T)){ Iterator[T]{}.ForEachIdx(input, f)}
// ForEachIdx is the same as ForEach except it also provides the
// index of the element to the callback.
func (iter Iterator[T]) ForEachIdx(input []T, f func(int,*T)){
......
var idx atomic.Int64//Create the task outside the loop to avoid extra closure allocations.
task := func(){
i :=int(idx.Add(1)-1)
for ; i < numInput; i =int(idx.Add(1)-1){
f(i,&input[i])}}
var wg conc.WaitGroup
for i :=0; i < iter.MaxGoroutines; i++{
wg.Go(task)}
wg.Wait()}
func (p *Pool) Go(f func()){
p.init()
if p.limiter== nil {// No limiton the number of goroutines.
select{
case p.tasks<- f:// A goroutine was available to handle the task.
default:// No goroutine was available to handle the task.
// Spawn a new one and send it the task.
p.handle.Go(p.worker)
p.tasks<- f
}}
......
}
func (p *Pool) worker(){// The only time this matters is if the task panics.
// This makes it possible to spin up new workers in that case.
defer p.limiter.release()
for f := range p.tasks{
f()}}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
复用方式很巧妙,如果处理速度足够快,没必要过多创建 goroutine
Stream 用于并发处理 goroutine, 但是返回结果保持顺序
type Stream struct {
pool pool.Pool
callbackerHandle conc.WaitGroup
queue chan callbackCh
initOnce sync.Once}