1package worker
2
3import (
4 "context"
5 "errors"
6 "sync"
7
8 "golang.org/x/exp/slog"
9)
10
11type (
12
13 // A simple task to deal with list.
14 ChanProcessor[T any] interface {
15 Query(context.Context) (<-chan T, error)
16 Process(context.Context, T) error
17 }
18
19 OnFail[T any] interface {
20 OnFail(context.Context, T, error)
21 }
22
23 ListProcessor[T any] interface {
24 Query(context.Context) ([]T, error)
25 Process(context.Context, T) error
26 }
27
28 chanProcessorTask[T any] struct {
29 chanProcessor ChanProcessor[T]
30 logger *slog.Logger
31 scheduler *Scheduler
32 }
33
34 batchProcessorTask[T any] struct {
35 batchProcessor ListProcessor[T]
36 logger *slog.Logger
37 scheduler *Scheduler
38 }
39
40 serialProcessorTask[T any] struct {
41 batchProcessor ListProcessor[T]
42 logger *slog.Logger
43 scheduler *Scheduler
44 }
45)
46
47func NewTaskFromBatchProcessor[T any](
48 batchProcessor ListProcessor[T],
49 scheduler *Scheduler,
50 logger *slog.Logger,
51) Task {
52 return &batchProcessorTask[T]{
53 batchProcessor: batchProcessor,
54 scheduler: scheduler,
55 logger: logger,
56 }
57}
58
59func NewTaskFromSerialProcessor[T any](
60 batchProcessor ListProcessor[T],
61 scheduler *Scheduler,
62 logger *slog.Logger,
63) Task {
64 return &serialProcessorTask[T]{
65 batchProcessor: batchProcessor,
66 scheduler: scheduler,
67 logger: logger,
68 }
69}
70
71func NewTaskFromChanProcessor[T any](
72 chanProcessor ChanProcessor[T],
73 scheduler *Scheduler,
74 logger *slog.Logger,
75) Task {
76 return &chanProcessorTask[T]{
77 chanProcessor: chanProcessor,
78 scheduler: scheduler,
79 logger: logger,
80 }
81}
82
83func (l *batchProcessorTask[T]) Start(ctx context.Context) error {
84 for {
85 values, err := l.batchProcessor.Query(ctx)
86 if err != nil {
87 return err
88 }
89
90 select {
91 case <-ctx.Done():
92 return ctx.Err()
93 default:
94 }
95
96 if len(values) == 0 {
97 return nil
98 }
99 var wg sync.WaitGroup
100
101 for _, v := range values {
102 select {
103 case <-ctx.Done():
104 return ctx.Err()
105 default:
106 }
107
108 wg.Add(1)
109 l.scheduler.Take()
110 go func(v T) {
111 defer l.scheduler.Return()
112 defer wg.Done()
113 if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
114 l.logger.Error(
115 "Error processing batch",
116 slog.String("error", err.Error()),
117 )
118 if failure, ok := l.batchProcessor.(OnFail[T]); ok {
119 failure.OnFail(ctx, v, err)
120 }
121 }
122 }(v)
123 }
124
125 wg.Wait()
126 }
127}
128
129func (l *serialProcessorTask[T]) Start(ctx context.Context) error {
130 for {
131 values, err := l.batchProcessor.Query(ctx)
132 if err != nil {
133 return err
134 }
135
136 select {
137 case <-ctx.Done():
138 return ctx.Err()
139 default:
140 }
141
142 if len(values) == 0 {
143 return nil
144 }
145 for _, v := range values {
146 select {
147 case <-ctx.Done():
148 return ctx.Err()
149 default:
150 }
151
152 l.scheduler.Take()
153 if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
154 l.logger.Error(
155 "Error processing batch",
156 slog.String("error", err.Error()),
157 )
158 if failure, ok := l.batchProcessor.(OnFail[T]); ok {
159 failure.OnFail(ctx, v, err)
160 }
161 }
162 l.scheduler.Return()
163 }
164 }
165}
166
167func (l *chanProcessorTask[T]) Start(ctx context.Context) error {
168 c, err := l.chanProcessor.Query(ctx)
169 if err != nil {
170 return err
171 }
172
173 for {
174 select {
175 case <-ctx.Done():
176 return ctx.Err()
177 case v, ok := <-c:
178 if !ok {
179 return nil
180 }
181
182 l.scheduler.Take()
183 go func(v T) {
184 defer l.scheduler.Return()
185 if err := l.chanProcessor.Process(ctx, v); err != nil {
186 l.logger.Error(
187 "Error processing batch",
188 slog.String("error", err.Error()),
189 )
190 if failure, ok := l.chanProcessor.(OnFail[T]); ok {
191 failure.OnFail(ctx, v, err)
192 }
193 }
194 }(v)
195 }
196 }
197}