1package worker
2
3import (
4 "context"
5 "errors"
6 "sync"
7
8 "github.com/sirupsen/logrus"
9)
10
11type (
12
13 // A simple task to deal with list.
14 ChanProcessor[T any] interface {
15 Query(context.Context) (<-chan T, error)
16 Process(context.Context, T) error
17 }
18
19 OnFail[T any] interface {
20 OnFail(context.Context, T, error)
21 }
22
23 ListProcessor[T any] interface {
24 Query(context.Context) ([]T, error)
25 Process(context.Context, T) error
26 }
27
28 chanProcessorTask[T any] struct {
29 chanProcessor ChanProcessor[T]
30 logrus *logrus.Entry
31 scheduler *Scheduler
32 }
33
34 batchProcessorTask[T any] struct {
35 batchProcessor ListProcessor[T]
36 logrus *logrus.Entry
37 scheduler *Scheduler
38 }
39
40 serialProcessorTask[T any] struct {
41 batchProcessor ListProcessor[T]
42 logrus *logrus.Entry
43 scheduler *Scheduler
44 }
45)
46
47func NewTaskFromBatchProcessor[T any](
48 batchProcessor ListProcessor[T],
49 scheduler *Scheduler,
50 logrus *logrus.Entry,
51) Task {
52 return &batchProcessorTask[T]{
53 batchProcessor: batchProcessor,
54 scheduler: scheduler,
55 logrus: logrus,
56 }
57}
58
59func NewTaskFromSerialProcessor[T any](
60 batchProcessor ListProcessor[T],
61 scheduler *Scheduler,
62 logrus *logrus.Entry,
63) Task {
64 return &serialProcessorTask[T]{
65 batchProcessor: batchProcessor,
66 scheduler: scheduler,
67 logrus: logrus,
68 }
69}
70
71func NewTaskFromChanProcessor[T any](
72 chanProcessor ChanProcessor[T],
73 scheduler *Scheduler,
74 logrus *logrus.Entry,
75) Task {
76 return &chanProcessorTask[T]{
77 chanProcessor: chanProcessor,
78 scheduler: scheduler,
79 logrus: logrus,
80 }
81}
82
83func (l *batchProcessorTask[T]) Start(ctx context.Context) error {
84 for {
85 values, err := l.batchProcessor.Query(ctx)
86 if err != nil {
87 return err
88 }
89
90 select {
91 case <-ctx.Done():
92 return ctx.Err()
93 default:
94 }
95
96 if len(values) == 0 {
97 return nil
98 }
99 var wg sync.WaitGroup
100
101 for _, v := range values {
102 select {
103 case <-ctx.Done():
104 return ctx.Err()
105 default:
106 }
107
108 wg.Add(1)
109 l.scheduler.Take()
110 go func(v T) {
111 defer l.scheduler.Return()
112 defer wg.Done()
113 if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
114 l.logrus.WithError(err).Error("Error processing batch")
115 if failure, ok := l.batchProcessor.(OnFail[T]); ok {
116 failure.OnFail(ctx, v, err)
117 }
118 }
119 }(v)
120 }
121
122 wg.Wait()
123 }
124}
125
126func (l *serialProcessorTask[T]) Start(ctx context.Context) error {
127 for {
128 values, err := l.batchProcessor.Query(ctx)
129 if err != nil {
130 return err
131 }
132
133 select {
134 case <-ctx.Done():
135 return ctx.Err()
136 default:
137 }
138
139 if len(values) == 0 {
140 return nil
141 }
142 for _, v := range values {
143 select {
144 case <-ctx.Done():
145 return ctx.Err()
146 default:
147 }
148
149 l.scheduler.Take()
150 if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
151 l.logrus.WithError(err).Error("Error processing batch")
152 if failure, ok := l.batchProcessor.(OnFail[T]); ok {
153 failure.OnFail(ctx, v, err)
154 }
155 }
156 l.scheduler.Return()
157 }
158 }
159}
160
161func (l *chanProcessorTask[T]) Start(ctx context.Context) error {
162 c, err := l.chanProcessor.Query(ctx)
163 if err != nil {
164 return err
165 }
166
167 for {
168 select {
169 case <-ctx.Done():
170 return ctx.Err()
171 case v, ok := <-c:
172 if !ok {
173 return nil
174 }
175
176 l.scheduler.Take()
177 go func(v T) {
178 defer l.scheduler.Return()
179 if err := l.chanProcessor.Process(ctx, v); err != nil {
180 l.logrus.WithError(err).Error("Error processing chan")
181 }
182 }(v)
183 }
184 }
185}