1package worker
2
3import (
4 "context"
5 "time"
6
7 "git.sr.ht/~gabrielgio/midr/db"
8 "git.sr.ht/~gabrielgio/midr/yt"
9 work "git.sr.ht/~sircmpwn/dowork"
10)
11
12const (
13 statusStoped = "STOPPED"
14 statusStarted = "STARTED"
15
16 commandStart = "START"
17 commandStop = "STOP"
18)
19
20type command struct {
21 action string
22 index uint
23}
24
25type Worker struct {
26 jobs map[uint]string
27 c chan command
28}
29
30func (w *Worker) SpawnWorker(index uint, link string, output string) {
31
32 if v, found := w.jobs[index]; found && v == statusStarted {
33 return
34 }
35
36 w.c <- command{action: commandStart, index: index}
37 task := work.NewTask(func(ctx context.Context) error {
38 yt.RunYtDlpProcess(link, output)
39 return nil
40 }).After(func(ctx context.Context, task *work.Task) {
41 w.c <- command{action: commandStop, index: index}
42 })
43
44 work.Enqueue(task)
45}
46
47func (w *Worker) startReader() {
48 for true {
49 command := <-w.c
50
51 if command.action == commandStop {
52 w.jobs[command.index] = statusStoped
53 } else if command.action == commandStart {
54 w.jobs[command.index] = statusStarted
55 } else {
56 panic(1)
57 }
58 }
59}
60
61func (w *Worker) startScheduler(model db.EntryModel) {
62 for true {
63 entries := model.All()
64 for _, e := range entries {
65 w.SpawnWorker(e.ID, e.Link, e.OutputFolder)
66 }
67 time.Sleep(30 * time.Minute)
68 }
69}
70
71func (w *Worker) StartWorker(model db.EntryModel) {
72 w.c = make(chan command, 10)
73 w.jobs = make(map[uint]string)
74 go w.startReader()
75 go w.startScheduler(model)
76}