1package worker
2
3import (
4 "context"
5 "errors"
6 "log/slog"
7 "time"
8
9 "golang.org/x/sync/errgroup"
10)
11
12type (
13 Task interface {
14 Start(context.Context) error
15 }
16
17 Work struct {
18 Name string
19 Task Task
20 wait time.Duration
21 }
22
23 TaskPool struct {
24 tasks []*Work
25 }
26)
27
28const (
29 format = "2006.01.02 15:04:05"
30)
31
32func NewTaskPool() *TaskPool {
33 return &TaskPool{}
34}
35
36func (w *Work) run(ctx context.Context) error {
37 // first time fire from the get go
38 timer := time.NewTimer(time.Nanosecond)
39
40 for {
41 select {
42 case <-ctx.Done():
43 return ctx.Err()
44 case <-timer.C:
45 if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
46 return err
47 }
48 }
49 timer.Reset(w.wait)
50 }
51}
52
53func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
54 self.tasks = append(self.tasks, &Work{
55 Name: name,
56 Task: task,
57 wait: wait,
58 })
59}
60
61func (self *TaskPool) Start(ctx context.Context) error {
62 var g errgroup.Group
63
64 for _, w := range self.tasks {
65 g.Go(func(w *Work) func() error {
66 return func() error {
67 slog.Info("Process starting", "time", time.Now().Format(format), "name", w.Name)
68 now := time.Now()
69 if err := w.run(ctx); err != nil && !errors.Is(context.Canceled, err) {
70 since := time.Since(now)
71 slog.Error(
72 "Process erred",
73 "time", time.Now().Format(format),
74 "name", w.Name,
75 "error", err,
76 "duration", since,
77 )
78 return err
79 } else {
80 since := time.Since(now)
81 slog.Info(
82 "Process ended",
83 "time", time.Now().Format(format),
84 "name", w.Name,
85 "duration", since,
86 )
87 }
88 return nil
89 }
90 }(w))
91 }
92
93 return g.Wait()
94}