Single recoverable worker
A go worker pattern with a single recoverable worker.
This is a simple implementation for a TurnOut channel model. The task is to compute the division of an array of ordered pairs and recover if the division is invalid and causes a panic. Note that the anti-pattern in closing the task input channel prevents it from scaling beyond one worker goroutine.
package main
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
type Input struct {
first int
second int
}
var errors uint64
var done uint64
func FanOut(inputs <-chan Input, results chan int) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("recovering from panic: %v\n", r)
atomic.AddUint64(&errors, 1)
go FanOut(inputs, results)
} else {
close(results)
}
}()
for in := range inputs {
// output could be sent to different channels
results <- in.first / in.second
atomic.AddUint64(&done, 1)
}
}
func TurnOut(quit <-chan int, inputs chan Input, results chan int) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("recovering from panic: %v\n", r)
atomic.AddUint64(&errors, 1)
go TurnOut(quit, inputs, results)
}
}()
for {
// inputs could be received from different input channels.
// Here is just one
select {
case in := <-inputs:
results <- in.first / in.second
atomic.AddUint64(&done, 1)
case <-quit:
close(inputs)
// flush the remaining tasks
go FanOut(inputs, results)
return
}
}
}
func aggregator(results chan int, wg *sync.WaitGroup) {
index := 1
for res := range results {
fmt.Printf("[%d] output: %v\n", index, res)
index++
}
wg.Done()
}
func main() {
rand.Seed(time.Now().Unix())
JobsNum := 200
testIns := []Input{}
for i := 0; i < JobsNum; i++ {
tmp := Input{
first: rand.Intn(5),
second: rand.Intn(5),
}
testIns = append(testIns, tmp)
}
inputs := make(chan Input)
results := make(chan int, 5)
quit := make(chan int)
go TurnOut(quit, inputs, results)
var wg sync.WaitGroup
wg.Add(1)
go aggregator(results, &wg)
for _, tstin := range testIns {
inputs <- tstin
time.Sleep(time.Duration(rand.Intn(4)) * time.Second)
}
close(quit)
wg.Wait()
fmt.Printf("Number of jobs:[%d]\nNumber of errors:[%d]\nNumber of successfully done tasks:[%d]\n",
JobsNum, errors, done)
}