序
本文主要研究一下tunny的workerWrapper
workerWrapper
type workerWrapper struct { worker Worker interruptChan chan struct{} // reqChan is NOT owned by this type, it is used to send requests for work. reqChan chan<- workRequest // closeChan can be closed in order to cleanly shutdown this worker. closeChan chan struct{} // closedChan is closed by the run() goroutine when it exits. closedChan chan struct{} } func newWorkerWrapper( reqChan chan<- workRequest, worker Worker, ) *workerWrapper { w := workerWrapper{ worker: worker, interruptChan: make(chan struct{}), reqChan: reqChan, closeChan: make(chan struct{}), closedChan: make(chan struct{}), } go w.run() return &w }
workerWrapper包装了worker,定义了interruptChan、reqChan、closeChan、closedChan属性
interrupt
func (w *workerWrapper) interrupt() { close(w.interruptChan) w.worker.Interrupt() }
interrupt方法关闭w.interruptChan,执行w.worker.Interrupt()
run
func (w *workerWrapper) run() { jobChan, retChan := make(chan interface{}), make(chan interface{}) defer func() { w.worker.Terminate() close(retChan) close(w.closedChan) }() for { // NOTE: Blocking here will prevent the worker from closing down. w.worker.BlockUntilReady() select { case w.reqChan <- workRequest{ jobChan: jobChan, retChan: retChan, interruptFunc: w.interrupt, }: select { case payload := <-jobChan: result := w.worker.Process(payload) select { case retChan <- result: case <-w.interruptChan: w.interruptChan = make(chan struct{}) } case _, _ = <-w.interruptChan: w.interruptChan = make(chan struct{}) } case <-w.closeChan: return } } }
run首先创建jobChan、retChan,然后for循环执行select读取reqChan,之后读取jobChan的payload,进行处理,然后写入到retChan
stop
func (w *workerWrapper) stop() { close(w.closeChan) }
stop方法关闭w.closeChan
join
func (w *workerWrapper) join() { <-w.closedChan }
join方法则等待w.closedChan
小结
tunny的workerWrapper包装了worker,定义了interruptChan、reqChan、closeChan、closedChan属性,它提供了interrupt、run、stop、join方法。
doc
tunny