midr @ 90d9d819b70f68e10482954cfc461737c0165f8a

  1package worker
  2
  3import (
  4	"context"
  5
  6	"git.sr.ht/~gabrielgio/midr/db"
  7	"git.sr.ht/~gabrielgio/midr/yt"
  8	work "git.sr.ht/~sircmpwn/dowork"
  9)
 10
 11const (
 12	statusNotQueued = "NOTQUEUED"
 13	statusQueued    = "QUEUED"
 14	statusStarted   = "RUNNING"
 15
 16	commandStart   = "START"
 17	commandEnqueue = "ENQUEUE"
 18	commandDequeue = "DEQUEUE"
 19)
 20
 21type command struct {
 22	action string
 23	index  uint
 24}
 25
 26type Worker struct {
 27	jobs map[uint]string
 28	c    chan command
 29}
 30
 31type Job struct {
 32	Id     uint
 33	Status string
 34}
 35
 36func NewWorkder() Worker {
 37	return Worker{
 38		c:    make(chan command),
 39		jobs: make(map[uint]string),
 40	}
 41}
 42
 43func (w *Worker) CanEnqueue(index uint) bool {
 44	v, found := w.jobs[index]
 45	return !found || v == statusNotQueued
 46}
 47
 48func (w *Worker) RemoveJob(id uint) {
 49	delete(w.jobs, id)
 50}
 51
 52func (w *Worker) SpawnWorker(entry *db.Entry) {
 53
 54	if !w.CanEnqueue(entry.ID) {
 55		return
 56	}
 57
 58	w.c <- command{action: commandEnqueue, index: entry.ID}
 59	task := work.NewTask(func(ctx context.Context) error {
 60
 61		w.c <- command{action: commandStart, index: entry.ID}
 62		yt.RunYtDlpProcess(entry)
 63		return nil
 64	}).After(func(ctx context.Context, task *work.Task) {
 65		w.c <- command{action: commandDequeue, index: entry.ID}
 66	})
 67
 68	work.Enqueue(task)
 69}
 70
 71func (w *Worker) startReader() {
 72	for {
 73		command := <-w.c
 74
 75		if command.action == commandEnqueue {
 76			w.jobs[command.index] = statusQueued
 77		} else if command.action == commandStart {
 78			w.jobs[command.index] = statusStarted
 79		} else if command.action == commandDequeue {
 80			w.jobs[command.index] = statusNotQueued
 81		} else {
 82			panic(1)
 83		}
 84	}
 85}
 86
 87func (w *Worker) StartReader() {
 88	go w.startReader()
 89}
 90
 91func (w *Worker) GetJobs() []Job {
 92	jobs := make([]Job, len(w.jobs))
 93	count := 0
 94
 95	for k, v := range w.jobs {
 96		jobs[count] = Job{Id: k, Status: v}
 97		count++
 98	}
 99
100	return jobs
101}