type token struct {
value interface{}
err error
}
type Token chan token
func NewToken() Token {
return make(Token,1)}
func (t Token) Done(value interface{}, err error){
t <- token{value: value, err: err}}
func (t Token) Wait(timeout time.Duration)(value interface{}, err error){
if timeout <=0{
tk :=<-t
return tk.value, tk.err}select{
case tk :=<-t:
return tk.value, tk.err
case <-time.After(timeout):
return nil, ErrTokenTimeout
}}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
其次,定义队列和其他参数:
type DataSource struct {
paramCh chan param
readTimeout time.Duration
concurrency int
step int}
type param struct {
cuuid string
token Token
}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
替换掉原来 ListClusterEndpoints 的实现:
func (p *DataSource) ListClusterEndpoints(ctx context.Context, cuuid string)([]ptypes.Endpoint, error){
req := param{
cuuid: cuuid,
token: NewToken(),}select{
case p.paramCh<- req:
default:
return nil, fmt.Errorf("list cluster endpoints write channel failed")}
value, err := req.token.Wait(p.readTimeout)
if err != nil {
return nil, err
}
eps, ok := value.([]ptypes.Endpoint)
if !ok {
return nil, fmt.Errorf("value is not endpoints")}
return endpoints, nil
}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
再起几个协程来处理任务:
func (p *DataSource) startListClusterEndpointsLoop(){
for i :=0; i < p.concurrency; i++{
go func(){
for {
reqs := p.getListClusterEndpointsReqFromChan()
p.doBatchListClusterEndpoints(reqs)}}()}}