lens @ 72ec551e6cb422531e543e3fb431324aed5ac025

  1package worker
  2
  3import (
  4	"context"
  5	"errors"
  6	"sync"
  7
  8	"github.com/sirupsen/logrus"
  9)
 10
 11type (
 12
 13	// A simple worker to deal with list.
 14	ChanProcessor[T any] interface {
 15		Query(context.Context) (<-chan T, error)
 16		Process(context.Context, T) error
 17	}
 18
 19	OnFail[T any] interface {
 20		OnFail(context.Context, T, error)
 21	}
 22
 23	ListProcessor[T any] interface {
 24		Query(context.Context) ([]T, error)
 25		Process(context.Context, T) error
 26	}
 27
 28	chanProcessorWorker[T any] struct {
 29		chanProcessor ChanProcessor[T]
 30		logrus        *logrus.Entry
 31		scheduler     *Scheduler
 32	}
 33
 34	batchProcessorWorker[T any] struct {
 35		batchProcessor ListProcessor[T]
 36		logrus         *logrus.Entry
 37		scheduler      *Scheduler
 38	}
 39
 40	serialProcessorWorker[T any] struct {
 41		batchProcessor ListProcessor[T]
 42		logrus         *logrus.Entry
 43		scheduler      *Scheduler
 44	}
 45)
 46
 47func NewWorkerFromBatchProcessor[T any](
 48	batchProcessor ListProcessor[T],
 49	scheduler *Scheduler,
 50	logrus *logrus.Entry,
 51) Worker {
 52	return &batchProcessorWorker[T]{
 53		batchProcessor: batchProcessor,
 54		scheduler:      scheduler,
 55		logrus:         logrus,
 56	}
 57}
 58
 59func NewWorkerFromSerialProcessor[T any](
 60	batchProcessor ListProcessor[T],
 61	scheduler *Scheduler,
 62	logrus *logrus.Entry,
 63) Worker {
 64	return &serialProcessorWorker[T]{
 65		batchProcessor: batchProcessor,
 66		scheduler:      scheduler,
 67		logrus:         logrus,
 68	}
 69}
 70
 71func NewWorkerFromChanProcessor[T any](
 72	chanProcessor ChanProcessor[T],
 73	scheduler *Scheduler,
 74	logrus *logrus.Entry,
 75) Worker {
 76	return &chanProcessorWorker[T]{
 77		chanProcessor: chanProcessor,
 78		scheduler:     scheduler,
 79		logrus:        logrus,
 80	}
 81}
 82
 83func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
 84	for {
 85		values, err := l.batchProcessor.Query(ctx)
 86		if err != nil {
 87			return err
 88		}
 89
 90		select {
 91		case <-ctx.Done():
 92			return ctx.Err()
 93		default:
 94		}
 95
 96		if len(values) == 0 {
 97			return nil
 98		}
 99		var wg sync.WaitGroup
100
101		for _, v := range values {
102			select {
103			case <-ctx.Done():
104				return ctx.Err()
105			default:
106			}
107
108			wg.Add(1)
109			l.scheduler.Take()
110			go func(v T) {
111				defer l.scheduler.Return()
112				defer wg.Done()
113				if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
114					l.logrus.WithError(err).Error("Error processing batch")
115					if failure, ok := l.batchProcessor.(OnFail[T]); ok {
116						failure.OnFail(ctx, v, err)
117					}
118				}
119			}(v)
120		}
121
122		wg.Wait()
123	}
124}
125
126func (l *serialProcessorWorker[T]) Start(ctx context.Context) error {
127	for {
128		values, err := l.batchProcessor.Query(ctx)
129		if err != nil {
130			return err
131		}
132
133		select {
134		case <-ctx.Done():
135			return ctx.Err()
136		default:
137		}
138
139		if len(values) == 0 {
140			return nil
141		}
142		for _, v := range values {
143			select {
144			case <-ctx.Done():
145				return ctx.Err()
146			default:
147			}
148
149			l.scheduler.Take()
150			if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
151				l.logrus.WithError(err).Error("Error processing batch")
152				if failure, ok := l.batchProcessor.(OnFail[T]); ok {
153					failure.OnFail(ctx, v, err)
154				}
155			}
156			l.scheduler.Return()
157		}
158	}
159}
160
161func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
162	c, err := l.chanProcessor.Query(ctx)
163	if err != nil {
164		return err
165	}
166
167	for {
168		select {
169		case <-ctx.Done():
170			return ctx.Err()
171		case v, ok := <-c:
172			if !ok {
173				return nil
174			}
175
176			l.scheduler.Take()
177			go func(v T) {
178				defer l.scheduler.Return()
179				if err := l.chanProcessor.Process(ctx, v); err != nil {
180					l.logrus.WithError(err).Error("Error processing chan")
181				}
182			}(v)
183		}
184	}
185}