1package worker
2
3import (
4 "context"
5 "errors"
6 "sync"
7
8 "github.com/sirupsen/logrus"
9)
10
11type (
12
13 // A simple worker to deal with list.
14 ChanProcessor[T any] interface {
15 Query(context.Context) (<-chan T, error)
16 Process(context.Context, T) error
17 }
18
19 BatchProcessor[T any] interface {
20 Query(context.Context) ([]T, error)
21 Process(context.Context, T) error
22 }
23
24 chanProcessorWorker[T any] struct {
25 chanProcessor ChanProcessor[T]
26 scheduler *Scheduler
27 }
28
29 batchProcessorWorker[T any] struct {
30 batchProcessor BatchProcessor[T]
31 logrus *logrus.Entry
32 scheduler *Scheduler
33 }
34)
35
36func NewWorkerFromBatchProcessor[T any](
37 batchProcessor BatchProcessor[T],
38 scheduler *Scheduler,
39 logrus *logrus.Entry,
40) Worker {
41 return &batchProcessorWorker[T]{
42 batchProcessor: batchProcessor,
43 scheduler: scheduler,
44 logrus: logrus,
45 }
46}
47
48func NewWorkerFromChanProcessor[T any](
49 chanProcessor ChanProcessor[T],
50 scheduler *Scheduler,
51) Worker {
52 return &chanProcessorWorker[T]{
53 chanProcessor: chanProcessor,
54 scheduler: scheduler,
55 }
56}
57
58func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
59 for {
60 values, err := l.batchProcessor.Query(ctx)
61 if err != nil {
62 return err
63 }
64
65 select {
66 case <-ctx.Done():
67 return ctx.Err()
68 default:
69 }
70
71 if len(values) == 0 {
72 return nil
73 }
74 var wg sync.WaitGroup
75
76 for _, v := range values {
77 wg.Add(1)
78 l.scheduler.Take()
79 go func(v T) {
80 defer l.scheduler.Return()
81 defer wg.Done()
82 if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
83 l.logrus.WithError(err).Error("Error processing batch")
84 }
85 }(v)
86 }
87
88 wg.Wait()
89 }
90}
91
92func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
93 c, err := l.chanProcessor.Query(ctx)
94 if err != nil {
95 return err
96 }
97
98 for {
99 select {
100 case <-ctx.Done():
101 return ctx.Err()
102 case v, ok := <-c:
103 if !ok {
104 return nil
105 }
106
107 if err := l.chanProcessor.Process(ctx, v); err != nil {
108 return err
109 }
110 }
111 }
112}