cerrado @ 2dd4cf35aab8324608a83d337459fd8354521b92

  1package worker
  2
  3import (
  4	"context"
  5	"errors"
  6	"log/slog"
  7	"time"
  8
  9	"golang.org/x/sync/errgroup"
 10)
 11
 12type (
 13
 14	// Task defines a long running core component of the application.
 15	Task interface {
 16		// Start defines when the component will be started.
 17		// The task MUST watch for the context and return when the context is
 18		// canceled.
 19		//
 20		// Start MUST only error on unhandled catastrophic errors, since
 21		// returning an error will cause the application to halt.
 22		//
 23		// Context canceled error is ignored and not reported as error as they
 24		// will be trigger by OS signals or some other component erring. But in
 25		// any case the task SHOULD handle and return context cancellation
 26		// error.
 27		Start(context.Context) error
 28	}
 29
 30	// TaskPool manages the life-cycle of a pool of tasks.
 31	TaskPool struct {
 32		tasks []*work
 33	}
 34
 35	// work is wrapper around task to add metadata to it.
 36	work struct {
 37		Name string
 38		Task Task
 39		wait time.Duration
 40	}
 41)
 42
 43func NewTaskPool() *TaskPool {
 44	return &TaskPool{}
 45}
 46
 47func (w *work) run(ctx context.Context) error {
 48	// first time fire the task from the get go
 49	timer := time.NewTimer(time.Nanosecond)
 50
 51	for {
 52		select {
 53		case <-ctx.Done():
 54			return ctx.Err()
 55		case <-timer.C:
 56			if err := w.Task.Start(ctx); err != nil {
 57				return err
 58			}
 59		}
 60		timer.Reset(w.wait)
 61	}
 62}
 63
 64func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
 65	self.tasks = append(self.tasks, &work{
 66		Name: name,
 67		Task: task,
 68		wait: wait,
 69	})
 70}
 71
 72func (self *TaskPool) Start(ctx context.Context) error {
 73	var g errgroup.Group
 74	ctx, cancel := context.WithCancel(ctx)
 75	defer cancel()
 76
 77	for _, w := range self.tasks {
 78		g.Go(func(w *work) func() error {
 79			return func() error {
 80				slog.Info("Process starting", "name", w.Name)
 81				now := time.Now()
 82				if err := w.run(ctx); err != nil && !errors.Is(context.Canceled, err) {
 83					since := time.Since(now)
 84					slog.Error(
 85						"Process erred",
 86						"name", w.Name,
 87						"error", err,
 88						"duration", since,
 89					)
 90					cancel()
 91					return err
 92				} else {
 93					since := time.Since(now)
 94					slog.Info(
 95						"Process ended",
 96						"name", w.Name,
 97						"duration", since,
 98					)
 99				}
100				return nil
101			}
102		}(w))
103	}
104
105	return g.Wait()
106}