lens @ cff5600c8abebd1ce988b2185c07e998c4a1d483

  1package worker
  2
  3import (
  4	"context"
  5)
  6
  7type (
  8
  9	// A simple worker to deal with list.
 10	ChanProcessor[T any] interface {
 11		Query(context.Context) (<-chan T, error)
 12		Process(context.Context, T) error
 13	}
 14
 15	ListProcessor[T any] interface {
 16		Query(context.Context) ([]T, error)
 17		Process(context.Context, T) error
 18	}
 19
 20	chanProcessorWorker[T any] struct {
 21		chanProcessor ChanProcessor[T]
 22		scheduler     *Scheduler
 23	}
 24
 25	listProcessorWorker[T any] struct {
 26		listProcessor ListProcessor[T]
 27		scheduler     *Scheduler
 28	}
 29)
 30
 31func NewWorkerFromListProcessor[T any](
 32	listProcessor ListProcessor[T],
 33	scheduler *Scheduler,
 34) Worker {
 35	return &listProcessorWorker[T]{
 36		listProcessor: listProcessor,
 37		scheduler:     scheduler,
 38	}
 39}
 40
 41func NewWorkerFromChanProcessor[T any](
 42	listProcessor ChanProcessor[T],
 43	scheduler *Scheduler,
 44) Worker {
 45	return &chanProcessorWorker[T]{
 46		chanProcessor: listProcessor,
 47		scheduler:     scheduler,
 48	}
 49}
 50
 51func (l *listProcessorWorker[T]) Start(ctx context.Context) error {
 52	for {
 53		values, err := l.listProcessor.Query(ctx)
 54		if err != nil {
 55			return err
 56		}
 57
 58		select {
 59		case <-ctx.Done():
 60			return ctx.Err()
 61		default:
 62		}
 63
 64		if len(values) == 0 {
 65			return nil
 66		}
 67
 68		for _, v := range values {
 69			select {
 70			case <-ctx.Done():
 71				return ctx.Err()
 72			default:
 73			}
 74
 75			if err := l.listProcessor.Process(ctx, v); err != nil {
 76				return err
 77			}
 78		}
 79	}
 80}
 81
 82func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
 83	c, err := l.chanProcessor.Query(ctx)
 84	if err != nil {
 85		return err
 86	}
 87
 88	for {
 89		select {
 90		case <-ctx.Done():
 91			return ctx.Err()
 92		case v, ok := <-c:
 93			if !ok {
 94				return nil
 95			}
 96
 97			if err := l.chanProcessor.Process(ctx, v); err != nil {
 98				return err
 99			}
100		}
101	}
102}