lens @ 7a414da9a802d5eeee911b3536790a061e1d7503

  1package worker
  2
  3import (
  4	"context"
  5	"errors"
  6	"sync"
  7
  8	"github.com/sirupsen/logrus"
  9)
 10
 11type (
 12
 13	// A simple worker to deal with list.
 14	ChanProcessor[T any] interface {
 15		Query(context.Context) (<-chan T, error)
 16		Process(context.Context, T) error
 17	}
 18
 19	BatchProcessor[T any] interface {
 20		Query(context.Context) ([]T, error)
 21		Process(context.Context, T) error
 22	}
 23
 24	chanProcessorWorker[T any] struct {
 25		chanProcessor ChanProcessor[T]
 26		logrus        *logrus.Entry
 27		scheduler     *Scheduler
 28	}
 29
 30	batchProcessorWorker[T any] struct {
 31		batchProcessor BatchProcessor[T]
 32		logrus         *logrus.Entry
 33		scheduler      *Scheduler
 34	}
 35)
 36
 37func NewWorkerFromBatchProcessor[T any](
 38	batchProcessor BatchProcessor[T],
 39	scheduler *Scheduler,
 40	logrus *logrus.Entry,
 41) Worker {
 42	return &batchProcessorWorker[T]{
 43		batchProcessor: batchProcessor,
 44		scheduler:      scheduler,
 45		logrus:         logrus,
 46	}
 47}
 48
 49func NewWorkerFromChanProcessor[T any](
 50	chanProcessor ChanProcessor[T],
 51	scheduler *Scheduler,
 52	logrus *logrus.Entry,
 53) Worker {
 54	return &chanProcessorWorker[T]{
 55		chanProcessor: chanProcessor,
 56		scheduler:     scheduler,
 57		logrus:        logrus,
 58	}
 59}
 60
 61func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
 62	for {
 63		values, err := l.batchProcessor.Query(ctx)
 64		if err != nil {
 65			return err
 66		}
 67
 68		select {
 69		case <-ctx.Done():
 70			return ctx.Err()
 71		default:
 72		}
 73
 74		if len(values) == 0 {
 75			return nil
 76		}
 77		var wg sync.WaitGroup
 78
 79		for _, v := range values {
 80			wg.Add(1)
 81			l.scheduler.Take()
 82			go func(v T) {
 83				defer l.scheduler.Return()
 84				defer wg.Done()
 85				if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
 86					l.logrus.WithError(err).Error("Error processing batch")
 87				}
 88			}(v)
 89		}
 90
 91		wg.Wait()
 92	}
 93}
 94
 95func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
 96	c, err := l.chanProcessor.Query(ctx)
 97	if err != nil {
 98		return err
 99	}
100
101	for {
102		select {
103		case <-ctx.Done():
104			return ctx.Err()
105		case v, ok := <-c:
106			if !ok {
107				return nil
108			}
109
110			l.scheduler.Take()
111			go func(v T) {
112				defer l.scheduler.Return()
113				if err := l.chanProcessor.Process(ctx, v); err != nil {
114					l.logrus.WithError(err).Error("Error processing chan")
115				}
116			}(v)
117		}
118	}
119}