1package worker
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sync"
8 "time"
9)
10
11type (
12 // Task should watch for context
13 Task interface {
14 Start(context.Context) error
15 }
16
17 Work struct {
18 Name string
19 Task Task
20 wait time.Duration
21 }
22
23 TaskPool struct {
24 tasks []*Work
25 }
26)
27
28func NewTaskPool() *TaskPool {
29 return &TaskPool{}
30}
31
32func (w *Work) run(ctx context.Context) error {
33 // first time fire from the get go
34 timer := time.NewTimer(time.Nanosecond)
35
36 for {
37 select {
38 case <-ctx.Done():
39 return ctx.Err()
40 case <-timer.C:
41 fmt.Println("Process starting: ", w.Name)
42 if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
43 fmt.Println("Process errored: ", w.Name, err.Error())
44 return err
45 } else {
46 fmt.Println("Process done: ", w.Name)
47 }
48 }
49 timer.Reset(w.wait)
50 }
51}
52
53func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
54 self.tasks = append(self.tasks, &Work{
55 Name: name,
56 Task: task,
57 wait: wait,
58 })
59}
60
61func (self *TaskPool) Start(ctx context.Context) {
62 var wg sync.WaitGroup
63
64 wg.Add(len(self.tasks))
65
66 for _, w := range self.tasks {
67 go func(w *Work) {
68 _ = w.run(ctx)
69 wg.Done()
70 }(w)
71 }
72
73 wg.Wait()
74}