1package worker
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sync"
8 "time"
9)
10
11type (
12 // Task should watch for context
13 Task interface {
14 Start(context.Context) error
15 }
16
17 Work struct {
18 Name string
19 Task Task
20 wait time.Duration
21 }
22
23 TaskPool struct {
24 tasks []*Work
25 }
26)
27
28const (
29 format = "2006.01.02 15:04:05"
30)
31
32func NewTaskPool() *TaskPool {
33 return &TaskPool{}
34}
35
36func (w *Work) run(ctx context.Context) error {
37 // first time fire from the get go
38 timer := time.NewTimer(time.Nanosecond)
39
40 for {
41 select {
42 case <-ctx.Done():
43 return ctx.Err()
44 case <-timer.C:
45 now := time.Now()
46 fmt.Printf("[%s] Process starting: %s\n", time.Now().Format(format), w.Name)
47 if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
48 since := time.Since(now)
49 fmt.Printf("[%s] Process errored (%s): %s\n", time.Now().Format(format), since.String(), w.Name)
50 return err
51 } else {
52 since := time.Since(now)
53 fmt.Printf("[%s] Process done (%s): %s\n", time.Now().Format(format), since.String(), w.Name)
54 }
55 }
56 timer.Reset(w.wait)
57 }
58}
59
60func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
61 self.tasks = append(self.tasks, &Work{
62 Name: name,
63 Task: task,
64 wait: wait,
65 })
66}
67
68func (self *TaskPool) Start(ctx context.Context) {
69 var wg sync.WaitGroup
70
71 wg.Add(len(self.tasks))
72
73 for _, w := range self.tasks {
74 go func(w *Work) {
75 _ = w.run(ctx)
76 wg.Done()
77 }(w)
78 }
79
80 wg.Wait()
81}