1package worker
2
3import (
4 "context"
5 "errors"
6 "sync"
7)
8
9type (
10
11 // A simple worker to deal with list.
12 ChanProcessor[T any] interface {
13 Query(context.Context) (<-chan T, error)
14 Process(context.Context, T) error
15 }
16
17 ListProcessor[T any] interface {
18 Query(context.Context) ([]T, error)
19 Process(context.Context, T) error
20 }
21
22 chanProcessorWorker[T any] struct {
23 chanProcessor ChanProcessor[T]
24 scheduler *Scheduler
25 }
26
27 listProcessorWorker[T any] struct {
28 listProcessor ListProcessor[T]
29 scheduler *Scheduler
30 }
31)
32
33func NewWorkerFromListProcessor[T any](
34 listProcessor ListProcessor[T],
35 scheduler *Scheduler,
36) Worker {
37 return &listProcessorWorker[T]{
38 listProcessor: listProcessor,
39 scheduler: scheduler,
40 }
41}
42
43func NewWorkerFromChanProcessor[T any](
44 listProcessor ChanProcessor[T],
45 scheduler *Scheduler,
46) Worker {
47 return &chanProcessorWorker[T]{
48 chanProcessor: listProcessor,
49 scheduler: scheduler,
50 }
51}
52
53func (l *listProcessorWorker[T]) Start(ctx context.Context) error {
54 for {
55 values, err := l.listProcessor.Query(ctx)
56 if err != nil {
57 return err
58 }
59
60 select {
61 case <-ctx.Done():
62 return ctx.Err()
63 default:
64 }
65
66 if len(values) == 0 {
67 return nil
68 }
69 var wg sync.WaitGroup
70
71 for _, v := range values {
72 wg.Add(1)
73 l.scheduler.Take()
74 go func(v T) {
75 defer l.scheduler.Return()
76 defer wg.Done()
77 if err := l.listProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
78 println("Err", err.Error())
79 }
80 }(v)
81 }
82
83 wg.Wait()
84 }
85}
86
87func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
88 c, err := l.chanProcessor.Query(ctx)
89 if err != nil {
90 return err
91 }
92
93 for {
94 select {
95 case <-ctx.Done():
96 return ctx.Err()
97 case v, ok := <-c:
98 if !ok {
99 return nil
100 }
101
102 if err := l.chanProcessor.Process(ctx, v); err != nil {
103 return err
104 }
105 }
106 }
107}