lens @ master

  1package worker
  2
  3import (
  4	"context"
  5	"errors"
  6	"sync"
  7
  8	"golang.org/x/exp/slog"
  9)
 10
 11type (
 12
 13	// A simple task to deal with list.
 14	ChanProcessor[T any] interface {
 15		Query(context.Context) (<-chan T, error)
 16		Process(context.Context, T) error
 17	}
 18
 19	OnFail[T any] interface {
 20		OnFail(context.Context, T, error)
 21	}
 22
 23	ListProcessor[T any] interface {
 24		Query(context.Context) ([]T, error)
 25		Process(context.Context, T) error
 26	}
 27
 28	chanProcessorTask[T any] struct {
 29		chanProcessor ChanProcessor[T]
 30		logger        *slog.Logger
 31		scheduler     *Scheduler
 32	}
 33
 34	batchProcessorTask[T any] struct {
 35		batchProcessor ListProcessor[T]
 36		logger         *slog.Logger
 37		scheduler      *Scheduler
 38	}
 39
 40	serialProcessorTask[T any] struct {
 41		batchProcessor ListProcessor[T]
 42		logger         *slog.Logger
 43		scheduler      *Scheduler
 44	}
 45)
 46
 47func NewTaskFromBatchProcessor[T any](
 48	batchProcessor ListProcessor[T],
 49	scheduler *Scheduler,
 50	logger *slog.Logger,
 51) Task {
 52	return &batchProcessorTask[T]{
 53		batchProcessor: batchProcessor,
 54		scheduler:      scheduler,
 55		logger:         logger,
 56	}
 57}
 58
 59func NewTaskFromSerialProcessor[T any](
 60	batchProcessor ListProcessor[T],
 61	scheduler *Scheduler,
 62	logger *slog.Logger,
 63) Task {
 64	return &serialProcessorTask[T]{
 65		batchProcessor: batchProcessor,
 66		scheduler:      scheduler,
 67		logger:         logger,
 68	}
 69}
 70
 71func NewTaskFromChanProcessor[T any](
 72	chanProcessor ChanProcessor[T],
 73	scheduler *Scheduler,
 74	logger *slog.Logger,
 75) Task {
 76	return &chanProcessorTask[T]{
 77		chanProcessor: chanProcessor,
 78		scheduler:     scheduler,
 79		logger:        logger,
 80	}
 81}
 82
 83func (l *batchProcessorTask[T]) Start(ctx context.Context) error {
 84	for {
 85		values, err := l.batchProcessor.Query(ctx)
 86		if err != nil {
 87			return err
 88		}
 89
 90		select {
 91		case <-ctx.Done():
 92			return ctx.Err()
 93		default:
 94		}
 95
 96		if len(values) == 0 {
 97			return nil
 98		}
 99		var wg sync.WaitGroup
100
101		for _, v := range values {
102			select {
103			case <-ctx.Done():
104				return ctx.Err()
105			default:
106			}
107
108			wg.Add(1)
109			l.scheduler.Take()
110			go func(v T) {
111				defer l.scheduler.Return()
112				defer wg.Done()
113				if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
114					l.logger.Error(
115						"Error processing batch",
116						slog.String("error", err.Error()),
117					)
118					if failure, ok := l.batchProcessor.(OnFail[T]); ok {
119						failure.OnFail(ctx, v, err)
120					}
121				}
122			}(v)
123		}
124
125		wg.Wait()
126	}
127}
128
129func (l *serialProcessorTask[T]) Start(ctx context.Context) error {
130	for {
131		values, err := l.batchProcessor.Query(ctx)
132		if err != nil {
133			return err
134		}
135
136		select {
137		case <-ctx.Done():
138			return ctx.Err()
139		default:
140		}
141
142		if len(values) == 0 {
143			return nil
144		}
145		for _, v := range values {
146			select {
147			case <-ctx.Done():
148				return ctx.Err()
149			default:
150			}
151
152			l.scheduler.Take()
153			if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
154				l.logger.Error(
155					"Error processing batch",
156					slog.String("error", err.Error()),
157				)
158				if failure, ok := l.batchProcessor.(OnFail[T]); ok {
159					failure.OnFail(ctx, v, err)
160				}
161			}
162			l.scheduler.Return()
163		}
164	}
165}
166
167func (l *chanProcessorTask[T]) Start(ctx context.Context) error {
168	c, err := l.chanProcessor.Query(ctx)
169	if err != nil {
170		return err
171	}
172
173	for {
174		select {
175		case <-ctx.Done():
176			return ctx.Err()
177		case v, ok := <-c:
178			if !ok {
179				return nil
180			}
181
182			l.scheduler.Take()
183			go func(v T) {
184				defer l.scheduler.Return()
185				if err := l.chanProcessor.Process(ctx, v); err != nil {
186					l.logger.Error(
187						"Error processing batch",
188						slog.String("error", err.Error()),
189					)
190					if failure, ok := l.chanProcessor.(OnFail[T]); ok {
191						failure.OnFail(ctx, v, err)
192					}
193				}
194			}(v)
195		}
196	}
197}