1package worker
2
3import (
4 "context"
5
6 "git.sr.ht/~gabrielgio/midr/yt"
7 work "git.sr.ht/~sircmpwn/dowork"
8)
9
10const (
11 statusNotQueued = "NOTQUEUED"
12 statusQueued = "QUEUED"
13 statusStarted = "RUNNING"
14
15 commandStart = "START"
16 commandEnqueue = "ENQUEUE"
17 commandDequeue = "DEQUEUE"
18)
19
20type command struct {
21 action string
22 index uint
23}
24
25type Worker struct {
26 jobs map[uint]string
27 c chan command
28}
29
30type Job struct {
31 Id uint
32 Status string
33}
34
35func NewWorkder() Worker {
36 return Worker{
37 c: make(chan command, 10),
38 jobs: make(map[uint]string),
39 }
40}
41
42func (w *Worker) CanEnqueue(index uint) bool {
43 v, found := w.jobs[index]
44 return !found || v == statusNotQueued
45}
46
47func (w *Worker) SpawnWorker(index uint, link string, output string) {
48
49 if !w.CanEnqueue(index) {
50 return
51 }
52
53 w.c <- command{action: commandEnqueue, index: index}
54 task := work.NewTask(func(ctx context.Context) error {
55 w.c <- command{action: commandStart, index: index}
56 yt.RunYtDlpProcess(link, output)
57 return nil
58 }).After(func(ctx context.Context, task *work.Task) {
59 w.c <- command{action: commandDequeue, index: index}
60 })
61
62 work.Enqueue(task)
63}
64
65func (w *Worker) startReader() {
66 for true {
67 command := <-w.c
68
69 if command.action == commandEnqueue {
70 w.jobs[command.index] = statusQueued
71 } else if command.action == commandStart {
72 w.jobs[command.index] = statusStarted
73 } else if command.action == commandDequeue {
74 w.jobs[command.index] = statusNotQueued
75 } else {
76 panic(1)
77 }
78 }
79}
80
81func (w *Worker) StartReader() {
82 go w.startReader()
83}
84
85func (w *Worker) GetJobs() []Job {
86 jobs := make([]Job, len(w.jobs))
87 count := 0
88
89 for k, v := range w.jobs {
90 jobs[count] = Job{Id: k, Status: v}
91 count++
92 }
93
94 return jobs
95}