lens @ 1d6fa60b3c60d068d12b19f10f5ad73e836b5a70

  1package worker
  2
  3import (
  4	"context"
  5	"errors"
  6	"sync"
  7)
  8
  9type (
 10
 11	// A simple worker to deal with list.
 12	ChanProcessor[T any] interface {
 13		Query(context.Context) (<-chan T, error)
 14		Process(context.Context, T) error
 15	}
 16
 17	ListProcessor[T any] interface {
 18		Query(context.Context) ([]T, error)
 19		Process(context.Context, T) error
 20	}
 21
 22	chanProcessorWorker[T any] struct {
 23		chanProcessor ChanProcessor[T]
 24		scheduler     *Scheduler
 25	}
 26
 27	listProcessorWorker[T any] struct {
 28		listProcessor ListProcessor[T]
 29		scheduler     *Scheduler
 30	}
 31)
 32
 33func NewWorkerFromListProcessor[T any](
 34	listProcessor ListProcessor[T],
 35	scheduler *Scheduler,
 36) Worker {
 37	return &listProcessorWorker[T]{
 38		listProcessor: listProcessor,
 39		scheduler:     scheduler,
 40	}
 41}
 42
 43func NewWorkerFromChanProcessor[T any](
 44	listProcessor ChanProcessor[T],
 45	scheduler *Scheduler,
 46) Worker {
 47	return &chanProcessorWorker[T]{
 48		chanProcessor: listProcessor,
 49		scheduler:     scheduler,
 50	}
 51}
 52
 53func (l *listProcessorWorker[T]) Start(ctx context.Context) error {
 54	for {
 55		values, err := l.listProcessor.Query(ctx)
 56		if err != nil {
 57			return err
 58		}
 59
 60		select {
 61		case <-ctx.Done():
 62			return ctx.Err()
 63		default:
 64		}
 65
 66		if len(values) == 0 {
 67			return nil
 68		}
 69		var wg sync.WaitGroup
 70
 71		for _, v := range values {
 72			wg.Add(1)
 73			l.scheduler.Take()
 74			go func(v T) {
 75				defer l.scheduler.Return()
 76				defer wg.Done()
 77				if err := l.listProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
 78					println("Err", err.Error())
 79				}
 80			}(v)
 81		}
 82
 83		wg.Wait()
 84	}
 85}
 86
 87func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
 88	c, err := l.chanProcessor.Query(ctx)
 89	if err != nil {
 90		return err
 91	}
 92
 93	for {
 94		select {
 95		case <-ctx.Done():
 96			return ctx.Err()
 97		case v, ok := <-c:
 98			if !ok {
 99				return nil
100			}
101
102			if err := l.chanProcessor.Process(ctx, v); err != nil {
103				return err
104			}
105		}
106	}
107}