1package worker
2
3import (
4 "context"
5
6 "git.sr.ht/~gabrielgio/midr/db"
7 "git.sr.ht/~gabrielgio/midr/yt"
8 work "git.sr.ht/~sircmpwn/dowork"
9)
10
11const (
12 statusNotQueued = "NOTQUEUED"
13 statusQueued = "QUEUED"
14 statusStarted = "RUNNING"
15
16 commandStart = "START"
17 commandEnqueue = "ENQUEUE"
18 commandDequeue = "DEQUEUE"
19)
20
21type command struct {
22 action string
23 index uint
24}
25
26type Worker struct {
27 jobs map[uint]string
28 c chan command
29}
30
31type Job struct {
32 Id uint
33 Status string
34}
35
36func NewWorkder() Worker {
37 return Worker{
38 c: make(chan command),
39 jobs: make(map[uint]string),
40 }
41}
42
43func (w *Worker) CanEnqueue(index uint) bool {
44 v, found := w.jobs[index]
45 return !found || v == statusNotQueued
46}
47
48func (w *Worker) RemoveJob(id uint) {
49 delete(w.jobs, id)
50}
51
52func (w *Worker) SpawnWorker(entry *db.Entry) {
53
54 if !w.CanEnqueue(entry.ID) {
55 return
56 }
57
58 w.c <- command{action: commandEnqueue, index: entry.ID}
59 task := work.NewTask(func(ctx context.Context) error {
60
61 w.c <- command{action: commandStart, index: entry.ID}
62 yt.RunYtDlpProcess(entry)
63 return nil
64 }).After(func(ctx context.Context, task *work.Task) {
65 w.c <- command{action: commandDequeue, index: entry.ID}
66 })
67
68 work.Enqueue(task)
69}
70
71func (w *Worker) startReader() {
72 for {
73 command := <-w.c
74
75 if command.action == commandEnqueue {
76 w.jobs[command.index] = statusQueued
77 } else if command.action == commandStart {
78 w.jobs[command.index] = statusStarted
79 } else if command.action == commandDequeue {
80 w.jobs[command.index] = statusNotQueued
81 } else {
82 panic(1)
83 }
84 }
85}
86
87func (w *Worker) StartReader() {
88 go w.startReader()
89}
90
91func (w *Worker) GetJobs() []Job {
92 jobs := make([]Job, len(w.jobs))
93 count := 0
94
95 for k, v := range w.jobs {
96 jobs[count] = Job{Id: k, Status: v}
97 count++
98 }
99
100 return jobs
101}