midr @ d06f6e1398c5dedb3359ff929c5aaf7317c73ce1

 1package worker
 2
 3import (
 4	"context"
 5	"time"
 6
 7	"git.sr.ht/~gabrielgio/midr/db"
 8	"git.sr.ht/~gabrielgio/midr/yt"
 9	work "git.sr.ht/~sircmpwn/dowork"
10)
11
12const (
13	statusStoped  = "STOPPED"
14	statusStarted = "STARTED"
15
16	commandStart = "START"
17	commandStop  = "STOP"
18)
19
20type command struct {
21	action string
22	index  uint
23}
24
25type Worker struct {
26	jobs map[uint]string
27	c    chan command
28}
29
30func (w *Worker) SpawnWorker(index uint, link string, output string) {
31
32	if v, found := w.jobs[index]; found && v == statusStarted {
33		return
34	}
35
36	w.c <- command{action: commandStart, index: index}
37	task := work.NewTask(func(ctx context.Context) error {
38		yt.RunYtDlpProcess(link, output)
39		return nil
40	}).After(func(ctx context.Context, task *work.Task) {
41		w.c <- command{action: commandStop, index: index}
42	})
43
44	work.Enqueue(task)
45}
46
47func (w *Worker) startReader() {
48	for true {
49		command := <-w.c
50
51		if command.action == commandStop {
52			w.jobs[command.index] = statusStoped
53		} else if command.action == commandStart {
54			w.jobs[command.index] = statusStarted
55		} else {
56			panic(1)
57		}
58	}
59}
60
61func (w *Worker) startScheduler(model db.EntryModel) {
62	for true {
63		entries := model.All()
64		for _, e := range entries {
65			w.SpawnWorker(e.ID, e.Link, e.OutputFolder)
66		}
67		time.Sleep(30 * time.Minute)
68	}
69}
70
71func (w *Worker) StartWorker(model db.EntryModel) {
72	w.c = make(chan command, 10)
73	w.jobs = make(map[uint]string)
74	go w.startReader()
75	go w.startScheduler(model)
76}