lens @ 14e5580efd51c7b9e70d304715e512a2ea2a1b21

  1package worker
  2
  3import (
  4	"context"
  5	"errors"
  6	"log/slog"
  7	"sync"
  8)
  9
 10type (
 11
 12	// A simple task to deal with list.
 13	ChanProcessor[T any] interface {
 14		Query(context.Context) (<-chan T, error)
 15		Process(context.Context, T) error
 16	}
 17
 18	OnFail[T any] interface {
 19		OnFail(context.Context, T, error)
 20	}
 21
 22	ListProcessor[T any] interface {
 23		Query(context.Context) ([]T, error)
 24		Process(context.Context, T) error
 25	}
 26
 27	chanProcessorTask[T any] struct {
 28		chanProcessor ChanProcessor[T]
 29		logger        *slog.Logger
 30		scheduler     *Scheduler
 31	}
 32
 33	batchProcessorTask[T any] struct {
 34		batchProcessor ListProcessor[T]
 35		logger         *slog.Logger
 36		scheduler      *Scheduler
 37	}
 38
 39	serialProcessorTask[T any] struct {
 40		batchProcessor ListProcessor[T]
 41		logger         *slog.Logger
 42		scheduler      *Scheduler
 43	}
 44)
 45
 46func NewTaskFromBatchProcessor[T any](
 47	batchProcessor ListProcessor[T],
 48	scheduler *Scheduler,
 49	logger *slog.Logger,
 50) Task {
 51	return &batchProcessorTask[T]{
 52		batchProcessor: batchProcessor,
 53		scheduler:      scheduler,
 54		logger:         logger,
 55	}
 56}
 57
 58func NewTaskFromSerialProcessor[T any](
 59	batchProcessor ListProcessor[T],
 60	scheduler *Scheduler,
 61	logger *slog.Logger,
 62) Task {
 63	return &serialProcessorTask[T]{
 64		batchProcessor: batchProcessor,
 65		scheduler:      scheduler,
 66		logger:         logger,
 67	}
 68}
 69
 70func NewTaskFromChanProcessor[T any](
 71	chanProcessor ChanProcessor[T],
 72	scheduler *Scheduler,
 73	logger *slog.Logger,
 74) Task {
 75	return &chanProcessorTask[T]{
 76		chanProcessor: chanProcessor,
 77		scheduler:     scheduler,
 78		logger:        logger,
 79	}
 80}
 81
 82func (l *batchProcessorTask[T]) Start(ctx context.Context) error {
 83	for {
 84		values, err := l.batchProcessor.Query(ctx)
 85		if err != nil {
 86			return err
 87		}
 88
 89		select {
 90		case <-ctx.Done():
 91			return ctx.Err()
 92		default:
 93		}
 94
 95		if len(values) == 0 {
 96			return nil
 97		}
 98		var wg sync.WaitGroup
 99
100		for _, v := range values {
101			select {
102			case <-ctx.Done():
103				return ctx.Err()
104			default:
105			}
106
107			wg.Add(1)
108			l.scheduler.Take()
109			go func(v T) {
110				defer l.scheduler.Return()
111				defer wg.Done()
112				if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
113					l.logger.Error(
114						"Error processing batch",
115						slog.String("error", err.Error()),
116					)
117					if failure, ok := l.batchProcessor.(OnFail[T]); ok {
118						failure.OnFail(ctx, v, err)
119					}
120				}
121			}(v)
122		}
123
124		wg.Wait()
125	}
126}
127
128func (l *serialProcessorTask[T]) Start(ctx context.Context) error {
129	for {
130		values, err := l.batchProcessor.Query(ctx)
131		if err != nil {
132			return err
133		}
134
135		select {
136		case <-ctx.Done():
137			return ctx.Err()
138		default:
139		}
140
141		if len(values) == 0 {
142			return nil
143		}
144		for _, v := range values {
145			select {
146			case <-ctx.Done():
147				return ctx.Err()
148			default:
149			}
150
151			l.scheduler.Take()
152			if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
153				l.logger.Error(
154					"Error processing batch",
155					slog.String("error", err.Error()),
156				)
157				if failure, ok := l.batchProcessor.(OnFail[T]); ok {
158					failure.OnFail(ctx, v, err)
159				}
160			}
161			l.scheduler.Return()
162		}
163	}
164}
165
166func (l *chanProcessorTask[T]) Start(ctx context.Context) error {
167	c, err := l.chanProcessor.Query(ctx)
168	if err != nil {
169		return err
170	}
171
172	for {
173		select {
174		case <-ctx.Done():
175			return ctx.Err()
176		case v, ok := <-c:
177			if !ok {
178				return nil
179			}
180
181			l.scheduler.Take()
182			go func(v T) {
183				defer l.scheduler.Return()
184				if err := l.chanProcessor.Process(ctx, v); err != nil {
185					l.logger.Error(
186						"Error processing batch",
187						slog.String("error", err.Error()),
188					)
189					if failure, ok := l.chanProcessor.(OnFail[T]); ok {
190						failure.OnFail(ctx, v, err)
191					}
192				}
193			}(v)
194		}
195	}
196}