1package worker
2
3import (
4 "context"
5 "errors"
6 "log/slog"
7 "sync"
8)
9
10type (
11
12 // A simple task to deal with list.
13 ChanProcessor[T any] interface {
14 Query(context.Context) (<-chan T, error)
15 Process(context.Context, T) error
16 }
17
18 OnFail[T any] interface {
19 OnFail(context.Context, T, error)
20 }
21
22 ListProcessor[T any] interface {
23 Query(context.Context) ([]T, error)
24 Process(context.Context, T) error
25 }
26
27 chanProcessorTask[T any] struct {
28 chanProcessor ChanProcessor[T]
29 logger *slog.Logger
30 scheduler *Scheduler
31 }
32
33 batchProcessorTask[T any] struct {
34 batchProcessor ListProcessor[T]
35 logger *slog.Logger
36 scheduler *Scheduler
37 }
38
39 serialProcessorTask[T any] struct {
40 batchProcessor ListProcessor[T]
41 logger *slog.Logger
42 scheduler *Scheduler
43 }
44)
45
46func NewTaskFromBatchProcessor[T any](
47 batchProcessor ListProcessor[T],
48 scheduler *Scheduler,
49 logger *slog.Logger,
50) Task {
51 return &batchProcessorTask[T]{
52 batchProcessor: batchProcessor,
53 scheduler: scheduler,
54 logger: logger,
55 }
56}
57
58func NewTaskFromSerialProcessor[T any](
59 batchProcessor ListProcessor[T],
60 scheduler *Scheduler,
61 logger *slog.Logger,
62) Task {
63 return &serialProcessorTask[T]{
64 batchProcessor: batchProcessor,
65 scheduler: scheduler,
66 logger: logger,
67 }
68}
69
70func NewTaskFromChanProcessor[T any](
71 chanProcessor ChanProcessor[T],
72 scheduler *Scheduler,
73 logger *slog.Logger,
74) Task {
75 return &chanProcessorTask[T]{
76 chanProcessor: chanProcessor,
77 scheduler: scheduler,
78 logger: logger,
79 }
80}
81
82func (l *batchProcessorTask[T]) Start(ctx context.Context) error {
83 for {
84 values, err := l.batchProcessor.Query(ctx)
85 if err != nil {
86 return err
87 }
88
89 select {
90 case <-ctx.Done():
91 return ctx.Err()
92 default:
93 }
94
95 if len(values) == 0 {
96 return nil
97 }
98 var wg sync.WaitGroup
99
100 for _, v := range values {
101 select {
102 case <-ctx.Done():
103 return ctx.Err()
104 default:
105 }
106
107 wg.Add(1)
108 l.scheduler.Take()
109 go func(v T) {
110 defer l.scheduler.Return()
111 defer wg.Done()
112 if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
113 l.logger.Error(
114 "Error processing batch",
115 slog.String("error", err.Error()),
116 )
117 if failure, ok := l.batchProcessor.(OnFail[T]); ok {
118 failure.OnFail(ctx, v, err)
119 }
120 }
121 }(v)
122 }
123
124 wg.Wait()
125 }
126}
127
128func (l *serialProcessorTask[T]) Start(ctx context.Context) error {
129 for {
130 values, err := l.batchProcessor.Query(ctx)
131 if err != nil {
132 return err
133 }
134
135 select {
136 case <-ctx.Done():
137 return ctx.Err()
138 default:
139 }
140
141 if len(values) == 0 {
142 return nil
143 }
144 for _, v := range values {
145 select {
146 case <-ctx.Done():
147 return ctx.Err()
148 default:
149 }
150
151 l.scheduler.Take()
152 if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
153 l.logger.Error(
154 "Error processing batch",
155 slog.String("error", err.Error()),
156 )
157 if failure, ok := l.batchProcessor.(OnFail[T]); ok {
158 failure.OnFail(ctx, v, err)
159 }
160 }
161 l.scheduler.Return()
162 }
163 }
164}
165
166func (l *chanProcessorTask[T]) Start(ctx context.Context) error {
167 c, err := l.chanProcessor.Query(ctx)
168 if err != nil {
169 return err
170 }
171
172 for {
173 select {
174 case <-ctx.Done():
175 return ctx.Err()
176 case v, ok := <-c:
177 if !ok {
178 return nil
179 }
180
181 l.scheduler.Take()
182 go func(v T) {
183 defer l.scheduler.Return()
184 if err := l.chanProcessor.Process(ctx, v); err != nil {
185 l.logger.Error(
186 "Error processing batch",
187 slog.String("error", err.Error()),
188 )
189 if failure, ok := l.chanProcessor.(OnFail[T]); ok {
190 failure.OnFail(ctx, v, err)
191 }
192 }
193 }(v)
194 }
195 }
196}