1package worker
2
3import (
4 "context"
5 "errors"
6 "sync"
7
8 "github.com/sirupsen/logrus"
9)
10
11type (
12
13 // A simple worker to deal with list.
14 ChanProcessor[T any] interface {
15 Query(context.Context) (<-chan T, error)
16 Process(context.Context, T) error
17 }
18
19 OnFail[T any] interface {
20 OnFail(context.Context, T, error)
21 }
22
23 BatchProcessor[T any] interface {
24 Query(context.Context) ([]T, error)
25 Process(context.Context, T) error
26 }
27
28 chanProcessorWorker[T any] struct {
29 chanProcessor ChanProcessor[T]
30 logrus *logrus.Entry
31 scheduler *Scheduler
32 }
33
34 batchProcessorWorker[T any] struct {
35 batchProcessor BatchProcessor[T]
36 logrus *logrus.Entry
37 scheduler *Scheduler
38 }
39)
40
41func NewWorkerFromBatchProcessor[T any](
42 batchProcessor BatchProcessor[T],
43 scheduler *Scheduler,
44 logrus *logrus.Entry,
45) Worker {
46 return &batchProcessorWorker[T]{
47 batchProcessor: batchProcessor,
48 scheduler: scheduler,
49 logrus: logrus,
50 }
51}
52
53func NewWorkerFromChanProcessor[T any](
54 chanProcessor ChanProcessor[T],
55 scheduler *Scheduler,
56 logrus *logrus.Entry,
57) Worker {
58 return &chanProcessorWorker[T]{
59 chanProcessor: chanProcessor,
60 scheduler: scheduler,
61 logrus: logrus,
62 }
63}
64
65func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
66 for {
67 values, err := l.batchProcessor.Query(ctx)
68 if err != nil {
69 return err
70 }
71
72 select {
73 case <-ctx.Done():
74 return ctx.Err()
75 default:
76 }
77
78 if len(values) == 0 {
79 return nil
80 }
81 var wg sync.WaitGroup
82
83 for _, v := range values {
84 select {
85 case <-ctx.Done():
86 return ctx.Err()
87 default:
88 }
89
90 wg.Add(1)
91 l.scheduler.Take()
92 go func(v T) {
93 defer l.scheduler.Return()
94 defer wg.Done()
95 if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
96 l.logrus.WithError(err).Error("Error processing batch")
97 if failure, ok := l.batchProcessor.(OnFail[T]); ok {
98 failure.OnFail(ctx, v, err)
99 }
100 }
101 }(v)
102 }
103
104 wg.Wait()
105 }
106}
107
108func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
109 c, err := l.chanProcessor.Query(ctx)
110 if err != nil {
111 return err
112 }
113
114 for {
115 select {
116 case <-ctx.Done():
117 return ctx.Err()
118 case v, ok := <-c:
119 if !ok {
120 return nil
121 }
122
123 l.scheduler.Take()
124 go func(v T) {
125 defer l.scheduler.Return()
126 if err := l.chanProcessor.Process(ctx, v); err != nil {
127 l.logrus.WithError(err).Error("Error processing chan")
128 }
129 }(v)
130 }
131 }
132}