1package worker
2
3import (
4 "context"
5 "errors"
6 "log/slog"
7 "time"
8
9 "golang.org/x/sync/errgroup"
10)
11
12type (
13
14 // Task defines a long running core component of the application.
15 Task interface {
16 // Start defines when the component will be started.
17 // The task MUST watch for the context and return when the context is
18 // canceled.
19 //
20 // Start MUST only error on unhandled catastrophic errors, since
21 // returning an error will cause the application to halt.
22 //
23 // Context canceled error is ignored and not reported as error as they
24 // will be trigger by OS signals or some other component erring. But in
25 // any case the task SHOULD handle and return context cancellation
26 // error.
27 Start(context.Context) error
28 }
29
30 // TaskPool manages the life-cycle of a pool of tasks.
31 TaskPool struct {
32 tasks []*work
33 }
34
35 // work is wrapper around task to add metadata to it.
36 work struct {
37 Name string
38 Task Task
39 wait time.Duration
40 }
41)
42
43func NewTaskPool() *TaskPool {
44 return &TaskPool{}
45}
46
47func (w *work) run(ctx context.Context) error {
48 // first time fire the task from the get go
49 timer := time.NewTimer(time.Nanosecond)
50
51 for {
52 select {
53 case <-ctx.Done():
54 return ctx.Err()
55 case <-timer.C:
56 if err := w.Task.Start(ctx); err != nil {
57 return err
58 }
59 }
60 timer.Reset(w.wait)
61 }
62}
63
64func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
65 self.tasks = append(self.tasks, &work{
66 Name: name,
67 Task: task,
68 wait: wait,
69 })
70}
71
72func (self *TaskPool) Start(ctx context.Context) error {
73 var g errgroup.Group
74 ctx, cancel := context.WithCancel(ctx)
75 defer cancel()
76
77 for _, w := range self.tasks {
78 g.Go(func(w *work) func() error {
79 return func() error {
80 slog.Info("Process starting", "name", w.Name)
81 now := time.Now()
82 if err := w.run(ctx); err != nil && !errors.Is(context.Canceled, err) {
83 since := time.Since(now)
84 slog.Error(
85 "Process erred",
86 "name", w.Name,
87 "error", err,
88 "duration", since,
89 )
90 cancel()
91 return err
92 } else {
93 since := time.Since(now)
94 slog.Info(
95 "Process ended",
96 "name", w.Name,
97 "duration", since,
98 )
99 }
100 return nil
101 }
102 }(w))
103 }
104
105 return g.Wait()
106}