cerrado @ 8a2461aa05895cc7828bc9619b50fa5dee5ed1f4

 1package worker
 2
 3import (
 4	"context"
 5	"errors"
 6	"log/slog"
 7	"time"
 8
 9	"golang.org/x/sync/errgroup"
10)
11
12type (
13	Task interface {
14		Start(context.Context) error
15	}
16
17	Work struct {
18		Name string
19		Task Task
20		wait time.Duration
21	}
22
23	TaskPool struct {
24		tasks []*Work
25	}
26)
27
28const (
29	format = "2006.01.02 15:04:05"
30)
31
32func NewTaskPool() *TaskPool {
33	return &TaskPool{}
34}
35
36func (w *Work) run(ctx context.Context) error {
37	// first time fire from the get go
38	timer := time.NewTimer(time.Nanosecond)
39
40	for {
41		select {
42		case <-ctx.Done():
43			return ctx.Err()
44		case <-timer.C:
45			if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
46				return err
47			}
48		}
49		timer.Reset(w.wait)
50	}
51}
52
53func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
54	self.tasks = append(self.tasks, &Work{
55		Name: name,
56		Task: task,
57		wait: wait,
58	})
59}
60
61func (self *TaskPool) Start(ctx context.Context) error {
62	var g errgroup.Group
63
64	for _, w := range self.tasks {
65		g.Go(func(w *Work) func() error {
66			return func() error {
67				slog.Info("Process starting", "time", time.Now().Format(format), "name", w.Name)
68				now := time.Now()
69				if err := w.run(ctx); err != nil && !errors.Is(context.Canceled, err) {
70					since := time.Since(now)
71					slog.Error(
72						"Process erred",
73						"time", time.Now().Format(format),
74						"name", w.Name,
75						"error", err,
76						"duration", since,
77					)
78					return err
79				} else {
80					since := time.Since(now)
81					slog.Info(
82						"Process ended",
83						"time", time.Now().Format(format),
84						"name", w.Name,
85						"duration", since,
86					)
87				}
88				return nil
89			}
90		}(w))
91	}
92
93	return g.Wait()
94}