Recoverable worker pool

A go worker pool pattern with multiple recoverable workers.

Recoverable worker pool

This is a simple implementation for a Fanout channel model. The task is to compute the division of an array of ordered pairs and recover if the division is invalid and causes a panic.

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

type Input struct {
	first  int
	second int
}

var errors uint64
var done uint64

func worker(inputs <-chan Input, results chan int, finished chan int) {

	defer func() {

		if r := recover(); r != nil {
			fmt.Printf("recovering from panic: %v\n", r)
			atomic.AddUint64(&errors, 1)
			go worker(inputs, results, finished)
		} else {
			finished <- 0
		}

	}()

	for in := range inputs {
		// output could be sent to different channels
		results <- in.first / in.second
		atomic.AddUint64(&done, 1)
	}

}

func aggregator(finished chan int, results chan int, wg *sync.WaitGroup) {

	index := 1
	WorkerNum := cap(finished)

	for {
		select {
		case res := <-results:
			fmt.Printf("[%d] output: %v\n", index, res)
			index++

		case <-finished:
			WorkerNum--

		default:
			if WorkerNum <= 0 {
				wg.Done()
				return
			}
		}
	}
}

func main() {

	rand.Seed(time.Now().Unix())

	JobsNum := 100
	WorkerNum := 5

	testIns := []Input{}
	for i := 0; i < JobsNum; i++ {
		tmp := Input{
			first:  rand.Intn(5),
			second: rand.Intn(5),
		}
		testIns = append(testIns, tmp)
	}

	inputs := make(chan Input)
	results := make(chan int, 5)
	finished := make(chan int, WorkerNum)

	for i := 0; i < WorkerNum; i++ {
		go worker(inputs, results, finished)
	}

	var wg sync.WaitGroup
	wg.Add(1)
	go aggregator(finished, results, &wg)

	for _, tstin := range testIns {
		inputs <- tstin
		time.Sleep(time.Duration(rand.Intn(4)) * time.Second)
	}
	close(inputs)

	wg.Wait()

	fmt.Printf("Number of jobs:[%d]\nNumber of errors:[%d]\nNumber of successfully done tasks:[%d]\n",
		JobsNum, errors, done)

}