lens @ d6cf67b3d7747b6274d92e394d75d348060fa5f5

  1package worker
  2
  3import (
  4	"context"
  5	"errors"
  6	"sync"
  7
  8	"github.com/sirupsen/logrus"
  9)
 10
 11type (
 12
 13	// A simple worker to deal with list.
 14	ChanProcessor[T any] interface {
 15		Query(context.Context) (<-chan T, error)
 16		Process(context.Context, T) error
 17	}
 18
 19	BatchProcessor[T any] interface {
 20		Query(context.Context) ([]T, error)
 21		Process(context.Context, T) error
 22	}
 23
 24	chanProcessorWorker[T any] struct {
 25		chanProcessor ChanProcessor[T]
 26		scheduler     *Scheduler
 27	}
 28
 29	batchProcessorWorker[T any] struct {
 30		batchProcessor BatchProcessor[T]
 31		logrus         *logrus.Entry
 32		scheduler      *Scheduler
 33	}
 34)
 35
 36func NewWorkerFromBatchProcessor[T any](
 37	batchProcessor BatchProcessor[T],
 38	scheduler *Scheduler,
 39	logrus *logrus.Entry,
 40) Worker {
 41	return &batchProcessorWorker[T]{
 42		batchProcessor: batchProcessor,
 43		scheduler:      scheduler,
 44		logrus:         logrus,
 45	}
 46}
 47
 48func NewWorkerFromChanProcessor[T any](
 49	chanProcessor ChanProcessor[T],
 50	scheduler *Scheduler,
 51) Worker {
 52	return &chanProcessorWorker[T]{
 53		chanProcessor: chanProcessor,
 54		scheduler:     scheduler,
 55	}
 56}
 57
 58func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
 59	for {
 60		values, err := l.batchProcessor.Query(ctx)
 61		if err != nil {
 62			return err
 63		}
 64
 65		select {
 66		case <-ctx.Done():
 67			return ctx.Err()
 68		default:
 69		}
 70
 71		if len(values) == 0 {
 72			return nil
 73		}
 74		var wg sync.WaitGroup
 75
 76		for _, v := range values {
 77			wg.Add(1)
 78			l.scheduler.Take()
 79			go func(v T) {
 80				defer l.scheduler.Return()
 81				defer wg.Done()
 82				if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
 83					l.logrus.WithError(err).Error("Error processing batch")
 84				}
 85			}(v)
 86		}
 87
 88		wg.Wait()
 89	}
 90}
 91
 92func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
 93	c, err := l.chanProcessor.Query(ctx)
 94	if err != nil {
 95		return err
 96	}
 97
 98	for {
 99		select {
100		case <-ctx.Done():
101			return ctx.Err()
102		case v, ok := <-c:
103			if !ok {
104				return nil
105			}
106
107			if err := l.chanProcessor.Process(ctx, v); err != nil {
108				return err
109			}
110		}
111	}
112}