midr @ 73b1e5746a3f074aa103b5914a02769ff057c56e

  1package worker
  2
  3import (
  4	"context"
  5	"time"
  6
  7	"git.sr.ht/~gabrielgio/midr/db"
  8	"git.sr.ht/~gabrielgio/midr/yt"
  9	work "git.sr.ht/~sircmpwn/dowork"
 10)
 11
 12const (
 13	statusNotQueued = "NOTQUEUED"
 14	statusQueued    = "QUEUED"
 15	statusStarted   = "RUNNING"
 16
 17	commandStart   = "START"
 18	commandEnqueue = "ENQUEUE"
 19	commandDequeue = "DEQUEUE"
 20)
 21
 22type command struct {
 23	action string
 24	index  uint
 25}
 26
 27type Worker struct {
 28	jobs map[uint]string
 29	c    chan command
 30}
 31
 32type Job struct {
 33	Id     uint
 34	Status string
 35}
 36
 37func (w *Worker) CanEnqueue(index uint) bool {
 38	v, found := w.jobs[index]
 39	return !found || v == statusNotQueued
 40}
 41
 42func (w *Worker) SpawnWorker(index uint, link string, output string) {
 43
 44	if !w.CanEnqueue(index) {
 45		return
 46	}
 47
 48	w.c <- command{action: commandEnqueue, index: index}
 49	task := work.NewTask(func(ctx context.Context) error {
 50		w.c <- command{action: commandStart, index: index}
 51		yt.RunYtDlpProcess(link, output)
 52		return nil
 53	}).After(func(ctx context.Context, task *work.Task) {
 54		w.c <- command{action: commandDequeue, index: index}
 55	})
 56
 57	work.Enqueue(task)
 58}
 59
 60func (w *Worker) startReader() {
 61	for true {
 62		command := <-w.c
 63
 64		if command.action == commandEnqueue {
 65			w.jobs[command.index] = statusQueued
 66		} else if command.action == commandStart {
 67			w.jobs[command.index] = statusStarted
 68		} else if command.action == commandDequeue {
 69			w.jobs[command.index] = statusNotQueued
 70		} else {
 71			panic(1)
 72		}
 73	}
 74}
 75
 76func (w *Worker) startScheduler(model db.EntryModel) {
 77	for true {
 78		entries := model.All()
 79		for _, e := range entries {
 80			w.SpawnWorker(e.ID, e.Link, e.OutputFolder)
 81		}
 82		time.Sleep(30 * time.Minute)
 83	}
 84}
 85
 86func (w *Worker) StartWorker(model db.EntryModel) {
 87	w.c = make(chan command, 10)
 88	w.jobs = make(map[uint]string)
 89	go w.startReader()
 90	go w.startScheduler(model)
 91}
 92
 93func (w *Worker) GetJobs() []Job {
 94	jobs := []Job{}
 95
 96	for k, v := range w.jobs {
 97		jobs = append(jobs, Job{Id: k, Status: v})
 98	}
 99
100	return jobs
101}