lens @ 398ae343e6a33fb81f82380b17081b8f9bae328a

  1package worker
  2
  3import (
  4	"context"
  5	"errors"
  6	"sync"
  7
  8	"github.com/sirupsen/logrus"
  9)
 10
 11type (
 12
 13	// A simple worker to deal with list.
 14	ChanProcessor[T any] interface {
 15		Query(context.Context) (<-chan T, error)
 16		Process(context.Context, T) error
 17	}
 18
 19	OnFail[T any] interface {
 20		OnFail(context.Context, T, error)
 21	}
 22
 23	BatchProcessor[T any] interface {
 24		Query(context.Context) ([]T, error)
 25		Process(context.Context, T) error
 26	}
 27
 28	chanProcessorWorker[T any] struct {
 29		chanProcessor ChanProcessor[T]
 30		logrus        *logrus.Entry
 31		scheduler     *Scheduler
 32	}
 33
 34	batchProcessorWorker[T any] struct {
 35		batchProcessor BatchProcessor[T]
 36		logrus         *logrus.Entry
 37		scheduler      *Scheduler
 38	}
 39)
 40
 41func NewWorkerFromBatchProcessor[T any](
 42	batchProcessor BatchProcessor[T],
 43	scheduler *Scheduler,
 44	logrus *logrus.Entry,
 45) Worker {
 46	return &batchProcessorWorker[T]{
 47		batchProcessor: batchProcessor,
 48		scheduler:      scheduler,
 49		logrus:         logrus,
 50	}
 51}
 52
 53func NewWorkerFromChanProcessor[T any](
 54	chanProcessor ChanProcessor[T],
 55	scheduler *Scheduler,
 56	logrus *logrus.Entry,
 57) Worker {
 58	return &chanProcessorWorker[T]{
 59		chanProcessor: chanProcessor,
 60		scheduler:     scheduler,
 61		logrus:        logrus,
 62	}
 63}
 64
 65func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
 66	for {
 67		values, err := l.batchProcessor.Query(ctx)
 68		if err != nil {
 69			return err
 70		}
 71
 72		select {
 73		case <-ctx.Done():
 74			return ctx.Err()
 75		default:
 76		}
 77
 78		if len(values) == 0 {
 79			return nil
 80		}
 81		var wg sync.WaitGroup
 82
 83		for _, v := range values {
 84			select {
 85			case <-ctx.Done():
 86				return ctx.Err()
 87			default:
 88			}
 89
 90			wg.Add(1)
 91			l.scheduler.Take()
 92			go func(v T) {
 93				defer l.scheduler.Return()
 94				defer wg.Done()
 95				if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
 96					l.logrus.WithError(err).Error("Error processing batch")
 97					if failure, ok := l.batchProcessor.(OnFail[T]); ok {
 98						failure.OnFail(ctx, v, err)
 99					}
100				}
101			}(v)
102		}
103
104		wg.Wait()
105	}
106}
107
108func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
109	c, err := l.chanProcessor.Query(ctx)
110	if err != nil {
111		return err
112	}
113
114	for {
115		select {
116		case <-ctx.Done():
117			return ctx.Err()
118		case v, ok := <-c:
119			if !ok {
120				return nil
121			}
122
123			l.scheduler.Take()
124			go func(v T) {
125				defer l.scheduler.Return()
126				if err := l.chanProcessor.Process(ctx, v); err != nil {
127					l.logrus.WithError(err).Error("Error processing chan")
128				}
129			}(v)
130		}
131	}
132}