1package worker
2
3import (
4 "context"
5)
6
7type (
8
9 // A simple worker to deal with list.
10 ChanProcessor[T any] interface {
11 Query(context.Context) (<-chan T, error)
12 Process(context.Context, T) error
13 }
14
15 ListProcessor[T any] interface {
16 Query(context.Context) ([]T, error)
17 Process(context.Context, T) error
18 }
19
20 chanProcessorWorker[T any] struct {
21 chanProcessor ChanProcessor[T]
22 scheduler *Scheduler
23 }
24
25 listProcessorWorker[T any] struct {
26 listProcessor ListProcessor[T]
27 scheduler *Scheduler
28 }
29)
30
31func NewWorkerFromListProcessor[T any](
32 listProcessor ListProcessor[T],
33 scheduler *Scheduler,
34) Worker {
35 return &listProcessorWorker[T]{
36 listProcessor: listProcessor,
37 scheduler: scheduler,
38 }
39}
40
41func NewWorkerFromChanProcessor[T any](
42 listProcessor ChanProcessor[T],
43 scheduler *Scheduler,
44) Worker {
45 return &chanProcessorWorker[T]{
46 chanProcessor: listProcessor,
47 scheduler: scheduler,
48 }
49}
50
51func (l *listProcessorWorker[T]) Start(ctx context.Context) error {
52 for {
53 values, err := l.listProcessor.Query(ctx)
54 if err != nil {
55 return err
56 }
57
58 select {
59 case <-ctx.Done():
60 return ctx.Err()
61 default:
62 }
63
64 if len(values) == 0 {
65 return nil
66 }
67
68 for _, v := range values {
69 select {
70 case <-ctx.Done():
71 return ctx.Err()
72 default:
73 }
74
75 if err := l.listProcessor.Process(ctx, v); err != nil {
76 return err
77 }
78 }
79 }
80}
81
82func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
83 c, err := l.chanProcessor.Query(ctx)
84 if err != nil {
85 return err
86 }
87
88 for {
89 select {
90 case <-ctx.Done():
91 return ctx.Err()
92 case v, ok := <-c:
93 if !ok {
94 return nil
95 }
96
97 if err := l.chanProcessor.Process(ctx, v); err != nil {
98 return err
99 }
100 }
101 }
102}