lens @ 2857052a38c9ad2c5946f6101bf7d37d63235df8

 1package worker
 2
 3import (
 4	"context"
 5	"errors"
 6	"fmt"
 7	"sync"
 8	"time"
 9)
10
11type (
12	// Task should watch for context
13	Task interface {
14		Start(context.Context) error
15	}
16
17	Work struct {
18		Name string
19		Task Task
20		wait time.Duration
21	}
22
23	TaskPool struct {
24		tasks []*Work
25	}
26)
27
28func NewTaskPool() *TaskPool {
29	return &TaskPool{}
30}
31
32func (w *Work) run(ctx context.Context) error {
33	// first time fire from the get go
34	timer := time.NewTimer(time.Nanosecond)
35
36	for {
37		select {
38		case <-ctx.Done():
39			return ctx.Err()
40		case <-timer.C:
41			fmt.Println("Process starting: ", w.Name)
42			if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
43				fmt.Println("Process errored: ", w.Name, err.Error())
44				return err
45			} else {
46				fmt.Println("Process done: ", w.Name)
47			}
48		}
49		timer.Reset(w.wait)
50	}
51}
52
53func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
54	self.tasks = append(self.tasks, &Work{
55		Name: name,
56		Task: task,
57		wait: wait,
58	})
59}
60
61func (self *TaskPool) Start(ctx context.Context) {
62	var wg sync.WaitGroup
63
64	wg.Add(len(self.tasks))
65
66	for _, w := range self.tasks {
67		go func(w *Work) {
68			_ = w.run(ctx)
69			wg.Done()
70		}(w)
71	}
72
73	wg.Wait()
74}