midr @ 4e5b2d9dfd9413ce084e64e048a57ad6e23356d3

 1package worker
 2
 3import (
 4	"context"
 5
 6	"git.sr.ht/~gabrielgio/midr/yt"
 7	work "git.sr.ht/~sircmpwn/dowork"
 8)
 9
10const (
11	statusNotQueued = "NOTQUEUED"
12	statusQueued    = "QUEUED"
13	statusStarted   = "RUNNING"
14
15	commandStart   = "START"
16	commandEnqueue = "ENQUEUE"
17	commandDequeue = "DEQUEUE"
18)
19
20type command struct {
21	action string
22	index  uint
23}
24
25type Worker struct {
26	jobs map[uint]string
27	c    chan command
28}
29
30type Job struct {
31	Id     uint
32	Status string
33}
34
35func NewWorkder() Worker {
36	return Worker{
37		c:    make(chan command, 10),
38		jobs: make(map[uint]string),
39	}
40}
41
42func (w *Worker) CanEnqueue(index uint) bool {
43	v, found := w.jobs[index]
44	return !found || v == statusNotQueued
45}
46
47func (w *Worker) SpawnWorker(index uint, link string, output string) {
48
49	if !w.CanEnqueue(index) {
50		return
51	}
52
53	w.c <- command{action: commandEnqueue, index: index}
54	task := work.NewTask(func(ctx context.Context) error {
55		w.c <- command{action: commandStart, index: index}
56		yt.RunYtDlpProcess(link, output)
57		return nil
58	}).After(func(ctx context.Context, task *work.Task) {
59		w.c <- command{action: commandDequeue, index: index}
60	})
61
62	work.Enqueue(task)
63}
64
65func (w *Worker) startReader() {
66	for true {
67		command := <-w.c
68
69		if command.action == commandEnqueue {
70			w.jobs[command.index] = statusQueued
71		} else if command.action == commandStart {
72			w.jobs[command.index] = statusStarted
73		} else if command.action == commandDequeue {
74			w.jobs[command.index] = statusNotQueued
75		} else {
76			panic(1)
77		}
78	}
79}
80
81func (w *Worker) StartReader() {
82	go w.startReader()
83}
84
85func (w *Worker) GetJobs() []Job {
86	jobs := make([]Job, len(w.jobs))
87	count := 0
88
89	for k, v := range w.jobs {
90		jobs[count] = Job{Id: k, Status: v}
91		count++
92	}
93
94	return jobs
95}