midr @ 40407573e35ff9b617faf84ac715de3d791282e4

  1package worker
  2
  3import (
  4	"bufio"
  5	"bytes"
  6	"context"
  7	"fmt"
  8	"os/exec"
  9
 10	"git.sr.ht/~gabrielgio/midr/db"
 11	work "git.sr.ht/~sircmpwn/dowork"
 12)
 13
 14const (
 15	statusNotQueued = "NOTQUEUED"
 16	statusQueued    = "QUEUED"
 17	statusStarted   = "RUNNING"
 18
 19	commandStart   = "START"
 20	commandEnqueue = "ENQUEUE"
 21	commandDequeue = "DEQUEUE"
 22)
 23
 24type command struct {
 25	action string
 26	index  uint
 27}
 28
 29type Worker struct {
 30	jobs map[uint]string
 31	c    chan command
 32}
 33
 34type Job struct {
 35	Id     uint
 36	Status string
 37}
 38
 39func NewWorkder() Worker {
 40	return Worker{
 41		c:    make(chan command),
 42		jobs: make(map[uint]string),
 43	}
 44}
 45
 46func RunYtDlpProcess(entry *db.Entry) (error, []byte, []byte) {
 47	args := []string{entry.Link}
 48	var stdout bytes.Buffer
 49	var stderr bytes.Buffer
 50
 51	output_template := fmt.Sprintf("%s/%%(title)s.%%(ext)s", entry.OutputFolder)
 52	args = append(args, "-o", output_template)
 53
 54	downloaded_txt := fmt.Sprintf("%s/downloaded.txt", entry.OutputFolder)
 55	args = append(args, "--download-archive", downloaded_txt)
 56
 57	if len(entry.DateAfter) > 0 {
 58		args = append(args, "--dateafter", entry.DateAfter)
 59	}
 60
 61	cmd := exec.Command("yt-dlp", args...)
 62	cmd.Stdout = bufio.NewWriter(&stdout)
 63	cmd.Stderr = bufio.NewWriter(&stderr)
 64
 65	err := cmd.Run()
 66
 67	return err, stdout.Bytes(), stderr.Bytes()
 68}
 69
 70func (w *Worker) CanEnqueue(index uint) bool {
 71	v, found := w.jobs[index]
 72	return !found || v == statusNotQueued
 73}
 74
 75func (w *Worker) RemoveJob(id uint) {
 76	delete(w.jobs, id)
 77}
 78
 79func (w *Worker) RunYtDlpWorker(entry *db.Entry) <-chan []byte {
 80	if !w.CanEnqueue(entry.ID) {
 81		return nil
 82	}
 83
 84	log := make(chan []byte)
 85
 86	w.c <- command{action: commandEnqueue, index: entry.ID}
 87	task := work.NewTask(func(ctx context.Context) error {
 88
 89		w.c <- command{action: commandStart, index: entry.ID}
 90		err, stdout, stderr := RunYtDlpProcess(entry)
 91
 92		log <- stdout
 93		log <- stderr
 94
 95		return err
 96	}).After(func(ctx context.Context, task *work.Task) {
 97		w.c <- command{action: commandDequeue, index: entry.ID}
 98		close(log)
 99	})
100
101	work.Enqueue(task)
102
103	return log
104}
105
106func (w *Worker) startReader() {
107	for {
108		command := <-w.c
109
110		if command.action == commandEnqueue {
111			w.jobs[command.index] = statusQueued
112		} else if command.action == commandStart {
113			w.jobs[command.index] = statusStarted
114		} else if command.action == commandDequeue {
115			w.jobs[command.index] = statusNotQueued
116		} else {
117			panic(1)
118		}
119	}
120}
121
122func (w *Worker) StartReader() {
123	go w.startReader()
124}
125
126func (w *Worker) GetJobs() []Job {
127	jobs := make([]Job, len(w.jobs))
128	count := 0
129
130	for k, v := range w.jobs {
131		jobs[count] = Job{Id: k, Status: v}
132		count++
133	}
134
135	return jobs
136}