Single recoverable worker

A go worker pattern with a single recoverable worker.

Single recoverable worker

This is a simple implementation for a TurnOut channel model. The task is to compute the division of an array of ordered pairs and recover if the division is invalid and causes a panic. Note that the anti-pattern in closing the task input channel prevents it from scaling beyond one worker goroutine.

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

type Input struct {
	first  int
	second int
}

var errors uint64
var done uint64

func FanOut(inputs <-chan Input, results chan int) {

	defer func() {

		if r := recover(); r != nil {
			fmt.Printf("recovering from panic: %v\n", r)
			atomic.AddUint64(&errors, 1)
			go FanOut(inputs, results)
		} else {
			close(results)
		}

	}()

	for in := range inputs {
		// output could be sent to different channels
		results <- in.first / in.second
		atomic.AddUint64(&done, 1)
	}

}

func TurnOut(quit <-chan int, inputs chan Input, results chan int) {

	defer func() {

		if r := recover(); r != nil {
			fmt.Printf("recovering from panic: %v\n", r)
			atomic.AddUint64(&errors, 1)
			go TurnOut(quit, inputs, results)
		}

	}()

	for {
		// inputs could be received from different input channels. 
		// Here is just one

		select {
		case in := <-inputs:
			results <- in.first / in.second
			atomic.AddUint64(&done, 1)
		case <-quit:
			close(inputs)
			// flush the remaining tasks
			go FanOut(inputs, results)
			return
		}
	}

}

func aggregator(results chan int, wg *sync.WaitGroup) {

	index := 1
	for res := range results {
		fmt.Printf("[%d] output: %v\n", index, res)
		index++
	}
	wg.Done()
}

func main() {

	rand.Seed(time.Now().Unix())
	JobsNum := 200

	testIns := []Input{}
	for i := 0; i < JobsNum; i++ {
		tmp := Input{
			first:  rand.Intn(5),
			second: rand.Intn(5),
		}
		testIns = append(testIns, tmp)
	}

	inputs := make(chan Input)
	results := make(chan int, 5)
	quit := make(chan int)

	go TurnOut(quit, inputs, results)

	var wg sync.WaitGroup
	wg.Add(1)
	go aggregator(results, &wg)

	for _, tstin := range testIns {
		inputs <- tstin
		time.Sleep(time.Duration(rand.Intn(4)) * time.Second)
	}
	close(quit)

	wg.Wait()

	fmt.Printf("Number of jobs:[%d]\nNumber of errors:[%d]\nNumber of successfully done tasks:[%d]\n",
		JobsNum, errors, done)

}