1package worker
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "fmt"
8 "os/exec"
9
10 "git.sr.ht/~gabrielgio/midr/db"
11 work "git.sr.ht/~sircmpwn/dowork"
12)
13
14const (
15 statusNotQueued = "NOTQUEUED"
16 statusQueued = "QUEUED"
17 statusStarted = "RUNNING"
18
19 commandStart = "START"
20 commandEnqueue = "ENQUEUE"
21 commandDequeue = "DEQUEUE"
22)
23
24type command struct {
25 action string
26 index uint
27}
28
29type Worker struct {
30 jobs map[uint]string
31 c chan command
32}
33
34type Job struct {
35 Id uint
36 Status string
37}
38
39func NewWorkder() Worker {
40 return Worker{
41 c: make(chan command),
42 jobs: make(map[uint]string),
43 }
44}
45
46func RunYtDlpProcess(entry *db.Entry) (error, []byte, []byte) {
47 args := []string{entry.Link}
48 var stdout bytes.Buffer
49 var stderr bytes.Buffer
50
51 output_template := fmt.Sprintf("%s/%%(title)s.%%(ext)s", entry.OutputFolder)
52 args = append(args, "-o", output_template)
53
54 downloaded_txt := fmt.Sprintf("%s/downloaded.txt", entry.OutputFolder)
55 args = append(args, "--download-archive", downloaded_txt)
56
57 if len(entry.DateAfter) > 0 {
58 args = append(args, "--dateafter", entry.DateAfter)
59 }
60
61 cmd := exec.Command("yt-dlp", args...)
62 cmd.Stdout = bufio.NewWriter(&stdout)
63 cmd.Stderr = bufio.NewWriter(&stderr)
64
65 err := cmd.Run()
66
67 return err, stdout.Bytes(), stderr.Bytes()
68}
69
70func (w *Worker) CanEnqueue(index uint) bool {
71 v, found := w.jobs[index]
72 return !found || v == statusNotQueued
73}
74
75func (w *Worker) RemoveJob(id uint) {
76 delete(w.jobs, id)
77}
78
79func (w *Worker) RunYtDlpWorker(entry *db.Entry) <-chan []byte {
80 if !w.CanEnqueue(entry.ID) {
81 return nil
82 }
83
84 log := make(chan []byte)
85
86 w.c <- command{action: commandEnqueue, index: entry.ID}
87 task := work.NewTask(func(ctx context.Context) error {
88
89 w.c <- command{action: commandStart, index: entry.ID}
90 err, stdout, stderr := RunYtDlpProcess(entry)
91
92 log <- stdout
93 log <- stderr
94
95 return err
96 }).After(func(ctx context.Context, task *work.Task) {
97 w.c <- command{action: commandDequeue, index: entry.ID}
98 close(log)
99 })
100
101 work.Enqueue(task)
102
103 return log
104}
105
106func (w *Worker) startReader() {
107 for {
108 command := <-w.c
109
110 if command.action == commandEnqueue {
111 w.jobs[command.index] = statusQueued
112 } else if command.action == commandStart {
113 w.jobs[command.index] = statusStarted
114 } else if command.action == commandDequeue {
115 w.jobs[command.index] = statusNotQueued
116 } else {
117 panic(1)
118 }
119 }
120}
121
122func (w *Worker) StartReader() {
123 go w.startReader()
124}
125
126func (w *Worker) GetJobs() []Job {
127 jobs := make([]Job, len(w.jobs))
128 count := 0
129
130 for k, v := range w.jobs {
131 jobs[count] = Job{Id: k, Status: v}
132 count++
133 }
134
135 return jobs
136}