1package worker
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sync"
8)
9
10type (
11 // Worker should watch for context
12 Worker interface {
13 Start(context.Context) error
14 }
15
16 Work struct {
17 Name string
18 Worker Worker
19 }
20
21 WorkerPool struct {
22 workers []*Work
23 wg sync.WaitGroup
24 }
25)
26
27func NewWorkerPool() *WorkerPool {
28 return &WorkerPool{}
29}
30
31func (self *WorkerPool) AddWorker(name string, worker Worker) {
32 self.workers = append(self.workers, &Work{
33 Name: name,
34 Worker: worker,
35 })
36}
37
38func (self *WorkerPool) Start(ctx context.Context) {
39 self.wg.Add(len(self.workers))
40 for _, w := range self.workers {
41 go func(w *Work) {
42 defer self.wg.Done()
43 if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
44 fmt.Println("Processes finished, error", w.Name, err.Error())
45 } else {
46 fmt.Println(w.Name, "done")
47 }
48 }(w)
49 }
50}
51
52func (self *WorkerPool) Wait() {
53 self.wg.Wait()
54}