Recoverable worker pool
A go worker pool pattern with multiple recoverable workers.
This is a simple implementation for a Fanout channel model. The task is to compute the division of an array of ordered pairs and recover if the division is invalid and causes a panic.
package main
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
type Input struct {
first int
second int
}
var errors uint64
var done uint64
func worker(inputs <-chan Input, results chan int, finished chan int) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("recovering from panic: %v\n", r)
atomic.AddUint64(&errors, 1)
go worker(inputs, results, finished)
} else {
finished <- 0
}
}()
for in := range inputs {
// output could be sent to different channels
results <- in.first / in.second
atomic.AddUint64(&done, 1)
}
}
func aggregator(finished chan int, results chan int, wg *sync.WaitGroup) {
index := 1
WorkerNum := cap(finished)
for {
select {
case res := <-results:
fmt.Printf("[%d] output: %v\n", index, res)
index++
case <-finished:
WorkerNum--
default:
if WorkerNum <= 0 {
wg.Done()
return
}
}
}
}
func main() {
rand.Seed(time.Now().Unix())
JobsNum := 100
WorkerNum := 5
testIns := []Input{}
for i := 0; i < JobsNum; i++ {
tmp := Input{
first: rand.Intn(5),
second: rand.Intn(5),
}
testIns = append(testIns, tmp)
}
inputs := make(chan Input)
results := make(chan int, 5)
finished := make(chan int, WorkerNum)
for i := 0; i < WorkerNum; i++ {
go worker(inputs, results, finished)
}
var wg sync.WaitGroup
wg.Add(1)
go aggregator(finished, results, &wg)
for _, tstin := range testIns {
inputs <- tstin
time.Sleep(time.Duration(rand.Intn(4)) * time.Second)
}
close(inputs)
wg.Wait()
fmt.Printf("Number of jobs:[%d]\nNumber of errors:[%d]\nNumber of successfully done tasks:[%d]\n",
JobsNum, errors, done)
}