func main(){
var wg sync.WaitGroup
for i:=0; i <10; i++{
wg.Add(1)
go func(){
defer wg.Done()
defer func(){// recover panic
err := recover()
if err != nil {
fmt.Println(err)}}// do something
handle()}}
wg.Wait()}
type RecoveredPanic struct {// The original value of the panic.
Value any
// The caller list as returned by runtime.Callers when the panic was
// recovered. Can be used to produce a more detailed stack information with
// runtime.CallersFrames.
Callers []uintptr
// The formatted stacktrace from the goroutine where the panic was recovered.
// Easier to use than Callers.
Stack []byte
}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
提供了Try方法执行方法,只会记录第一个panic的gououtine信息:
func (p *Catcher) Try(f func()){
defer p.tryRecover()
f()}
func (p *Catcher) tryRecover(){
if val := recover(); val != nil {
rp := NewRecoveredPanic(1, val)// 只会记录第一个panic的goroutine信息
p.recovered.CompareAndSwap(nil,&rp)}}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
提供了Repanic()方法用来重放捕获的panic:
func (p *Catcher) Repanic(){
if val := p.Recovered(); val != nil {
panic(val)}}
func (p *Catcher) Recovered()*RecoveredPanic {
return p.recovered.Load()}
1.
2.
3.
4.
5.
6.
7.
8.
9.
waitGroup对此也分别提供了Wait()、WaitAndRecover()方法:
func (h *WaitGroup) Wait(){
h.wg.Wait()// Propagate a panic if we caught one from a child goroutine.
h.pc.Repanic()}
func (h *WaitGroup) WaitAndRecover()*panics.RecoveredPanic{
h.wg.Wait()// Return a recovered panic if we caught one from a child goroutine.
return h.pc.Recovered()}
import "github.com/sourcegraph/conc/pool"
func ExampleContextPool_WithCancelOnError(){
p := pool.New().
WithMaxGoroutines(4).
WithContext(context.Background()).
WithCancelOnError()
for i :=0; i <3; i++{
i := i
p.Go(func(ctx context.Context) error {
if i ==2{
return errors.New("I will cancel all other tasks!")}<-ctx.Done()
return nil
})}
err := p.Wait()
fmt.Println(err)// Output:// I will cancel all other tasks!}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
在创建pool时有如下方法可以调用:
p.WithMaxGoroutines()配置pool中goroutine的最大数量
p.WithErrors:配置pool中的task是否返回error
p.WithContext(ctx):配置pool中运行的task当遇到第一个error要取消
p.WithFirstError:配置pool中的task只返回第一个error
p.WithCollectErrored:配置pool的task收集所有error
pool的基础结构如下:
type Pool struct {
handle conc.WaitGroup
limiter limiter
tasks chan func()
initOnce sync.Once}
1.
2.
3.
4.
5.
6.
limiter是控制器,用chan来控制goroutine的数量:
type limiter chan struct{}
func (l limiter)limit()int{
return cap(l)}
func (l limiter) release(){
if l != nil {<-l
}}
func (p *Pool) Go(f func()){
p.init()
if p.limiter== nil {// 没有限制
select{
case p.tasks<- f:// A goroutine was available to handle the task.
default:// No goroutine was available to handle the task.
// Spawn a new one and send it the task.
p.handle.Go(p.worker)
p.tasks<- f
}} else {select{
case p.limiter<- struct{}{}:// If we are below our limit, spawn a new worker rather
// than waiting for one to become available.
p.handle.Go(p.worker)// We know there is at least one worker running, so wait
// for it to become available. This ensures we never spawn
// more workers than the number of tasks.
p.tasks<- f
case p.tasks<- f:// A worker is available and has accepted the task.
return
}}}
func ExampleStream(){
times :=[]int{20,52,16,45,4,80}
stream := stream2.New()
for _, millis := range times {
dur :=time.Duration(millis)*time.Millisecond
stream.Go(func() stream2.Callback{time.Sleep(dur)// This will print in the order the tasks were submitted
return func(){ fmt.Println(dur)}})}
stream.Wait()// Output://20ms
//52ms
//16ms
//45ms
//4ms
//80ms
}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
stream的结构如下:
type Stream struct {
pool pool.Pool
callbackerHandle conc.WaitGroup
queue chan callbackCh
initOnce sync.Once
}
func (s *Stream) Go(f Task){
s.init()// Get a channel from the cache.
ch := getCh()// Queue the channel for the callbacker.
s.queue<- ch
// Submit the task for execution.
s.pool.Go(func(){
defer func(){//In the case of a panic from f, we don't want the callbacker to // starve waiting for a callback from this channel, so give it an // empty callback. if r := recover(); r != nil { ch <- func() {} panic(r) } }() // Run the task, sending its callback down this task's channel.
callback := f()
ch <- callback
})}
var callbackChPool = sync.Pool{
New: func() any {
return make(callbackCh,1)},}
func getCh() callbackCh {
return callbackChPool.Get().(callbackCh)}
func putCh(ch callbackCh){
callbackChPool.Put(ch)}
type Iterator[T any] struct {
MaxGoroutines int}
func (iter Iterator[T]) ForEachIdx(input []T, f func(int,*T)){
if iter.MaxGoroutines==0{// iter is a value receiver andis hence safe to mutate
iter.MaxGoroutines= defaultMaxGoroutines()}
numInput := len(input)
if iter.MaxGoroutines> numInput {// No more concurrent tasks than the number of input items.
iter.MaxGoroutines= numInput
}
var idx atomic.Int64// 通过atomic控制仅创建一个闭包
task := func(){
i :=int(idx.Add(1)-1)
for ; i < numInput; i =int(idx.Add(1)-1){
f(i,&input[i])}}
var wg conc.WaitGroup
for i :=0; i < iter.MaxGoroutines; i++{
wg.Go(task)}
wg.Wait()}