1package worker
2
3import (
4 "context"
5 "time"
6
7 "git.sr.ht/~gabrielgio/midr/db"
8 "git.sr.ht/~gabrielgio/midr/yt"
9 work "git.sr.ht/~sircmpwn/dowork"
10)
11
12const (
13 statusNotQueued = "NOTQUEUED"
14 statusQueued = "QUEUED"
15 statusStarted = "RUNNING"
16
17 commandStart = "START"
18 commandEnqueue = "ENQUEUE"
19 commandDequeue = "DEQUEUE"
20)
21
22type command struct {
23 action string
24 index uint
25}
26
27type Worker struct {
28 jobs map[uint]string
29 c chan command
30}
31
32type Job struct {
33 Id uint
34 Status string
35}
36
37func (w *Worker) CanEnqueue(index uint) bool {
38 v, found := w.jobs[index]
39 return !found || v == statusNotQueued
40}
41
42func (w *Worker) SpawnWorker(index uint, link string, output string) {
43
44 if !w.CanEnqueue(index) {
45 return
46 }
47
48 w.c <- command{action: commandEnqueue, index: index}
49 task := work.NewTask(func(ctx context.Context) error {
50 w.c <- command{action: commandStart, index: index}
51 yt.RunYtDlpProcess(link, output)
52 return nil
53 }).After(func(ctx context.Context, task *work.Task) {
54 w.c <- command{action: commandDequeue, index: index}
55 })
56
57 work.Enqueue(task)
58}
59
60func (w *Worker) startReader() {
61 for true {
62 command := <-w.c
63
64 if command.action == commandEnqueue {
65 w.jobs[command.index] = statusQueued
66 } else if command.action == commandStart {
67 w.jobs[command.index] = statusStarted
68 } else if command.action == commandDequeue {
69 w.jobs[command.index] = statusNotQueued
70 } else {
71 panic(1)
72 }
73 }
74}
75
76func (w *Worker) startScheduler(model db.EntryModel) {
77 for true {
78 entries := model.All()
79 for _, e := range entries {
80 w.SpawnWorker(e.ID, e.Link, e.OutputFolder)
81 }
82 time.Sleep(30 * time.Minute)
83 }
84}
85
86func (w *Worker) StartWorker(model db.EntryModel) {
87 w.c = make(chan command, 10)
88 w.jobs = make(map[uint]string)
89 go w.startReader()
90 go w.startScheduler(model)
91}
92
93func (w *Worker) GetJobs() []Job {
94 jobs := make([]Job, len(w.jobs))
95 count := 0
96
97 for k, v := range w.jobs {
98 jobs[count] = Job{Id: k, Status: v}
99 count++
100 }
101
102 return jobs
103}