lens @ 4bc07694269c17f6d915ae084aba1b0814e02dff

 1package worker
 2
 3import (
 4	"context"
 5	"errors"
 6	"fmt"
 7	"sync"
 8	"time"
 9)
10
11type (
12	// Task should watch for context
13	Task interface {
14		Start(context.Context) error
15	}
16
17	Work struct {
18		Name string
19		Task Task
20		wait time.Duration
21	}
22
23	TaskPool struct {
24		tasks []*Work
25	}
26)
27
28const (
29	format = "2006.01.02 15:04:05"
30)
31
32func NewTaskPool() *TaskPool {
33	return &TaskPool{}
34}
35
36func (w *Work) run(ctx context.Context) error {
37	// first time fire from the get go
38	timer := time.NewTimer(time.Nanosecond)
39
40	for {
41		select {
42		case <-ctx.Done():
43			return ctx.Err()
44		case <-timer.C:
45			now := time.Now()
46			fmt.Printf("[%s] Process starting: %s\n", time.Now().Format(format), w.Name)
47			if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
48				since := time.Since(now)
49				fmt.Printf("[%s] Process errored (%s): %s\n", time.Now().Format(format), since.String(), w.Name)
50				return err
51			} else {
52				since := time.Since(now)
53				fmt.Printf("[%s] Process done (%s): %s\n", time.Now().Format(format), since.String(), w.Name)
54			}
55		}
56		timer.Reset(w.wait)
57	}
58}
59
60func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
61	self.tasks = append(self.tasks, &Work{
62		Name: name,
63		Task: task,
64		wait: wait,
65	})
66}
67
68func (self *TaskPool) Start(ctx context.Context) {
69	var wg sync.WaitGroup
70
71	wg.Add(len(self.tasks))
72
73	for _, w := range self.tasks {
74		go func(w *Work) {
75			_ = w.run(ctx)
76			wg.Done()
77		}(w)
78	}
79
80	wg.Wait()
81}