1package worker
2
3import (
4 "context"
5 "errors"
6 "sync"
7
8 "github.com/sirupsen/logrus"
9)
10
11type (
12
13 // A simple worker to deal with list.
14 ChanProcessor[T any] interface {
15 Query(context.Context) (<-chan T, error)
16 Process(context.Context, T) error
17 }
18
19 BatchProcessor[T any] interface {
20 Query(context.Context) ([]T, error)
21 Process(context.Context, T) error
22 }
23
24 chanProcessorWorker[T any] struct {
25 chanProcessor ChanProcessor[T]
26 logrus *logrus.Entry
27 scheduler *Scheduler
28 }
29
30 batchProcessorWorker[T any] struct {
31 batchProcessor BatchProcessor[T]
32 logrus *logrus.Entry
33 scheduler *Scheduler
34 }
35)
36
37func NewWorkerFromBatchProcessor[T any](
38 batchProcessor BatchProcessor[T],
39 scheduler *Scheduler,
40 logrus *logrus.Entry,
41) Worker {
42 return &batchProcessorWorker[T]{
43 batchProcessor: batchProcessor,
44 scheduler: scheduler,
45 logrus: logrus,
46 }
47}
48
49func NewWorkerFromChanProcessor[T any](
50 chanProcessor ChanProcessor[T],
51 scheduler *Scheduler,
52 logrus *logrus.Entry,
53) Worker {
54 return &chanProcessorWorker[T]{
55 chanProcessor: chanProcessor,
56 scheduler: scheduler,
57 logrus: logrus,
58 }
59}
60
61func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
62 for {
63 values, err := l.batchProcessor.Query(ctx)
64 if err != nil {
65 return err
66 }
67
68 select {
69 case <-ctx.Done():
70 return ctx.Err()
71 default:
72 }
73
74 if len(values) == 0 {
75 return nil
76 }
77 var wg sync.WaitGroup
78
79 for _, v := range values {
80 wg.Add(1)
81 l.scheduler.Take()
82 go func(v T) {
83 defer l.scheduler.Return()
84 defer wg.Done()
85 if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
86 l.logrus.WithError(err).Error("Error processing batch")
87 }
88 }(v)
89 }
90
91 wg.Wait()
92 }
93}
94
95func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
96 c, err := l.chanProcessor.Query(ctx)
97 if err != nil {
98 return err
99 }
100
101 for {
102 select {
103 case <-ctx.Done():
104 return ctx.Err()
105 case v, ok := <-c:
106 if !ok {
107 return nil
108 }
109
110 l.scheduler.Take()
111 go func(v T) {
112 defer l.scheduler.Return()
113 if err := l.chanProcessor.Process(ctx, v); err != nil {
114 l.logrus.WithError(err).Error("Error processing chan")
115 }
116 }(v)
117 }
118 }
119}