midr @ 1e984fc8ced6a5915dbd7b6e17bd942e8438cf27

ref: Move the yt manager to the worker

Simplify the worker/manager relationship. Now the worker is responsible
for the managing the yt-dlp process as well.

Also introduce chan to report back logs. That is an attempt to decouple
things.
  1diff --git a/README.md b/README.md
  2index e19fa4664874be84e39fe1be7740549b2ffa8a9c..7c8977099564782646fec54a964788e201819dd6 100644
  3--- a/README.md
  4+++ b/README.md
  5@@ -2,4 +2,3 @@ # yt downloader simple frontend
  6 
  7 This projects aim to automat a bit youtube-dl/yt-dlp downloads and learn some
  8 go in the process.
  9-
 10diff --git a/controller/controller.go b/controller/controller.go
 11index c7f41454059c3dfad7a8239ab948eb0779ba3f2e..701d34c80d127577f8ee7142e43582e3d4674688 100644
 12--- a/controller/controller.go
 13+++ b/controller/controller.go
 14@@ -1,8 +1,10 @@
 15 package controller
 16 
 17 import (
 18+	"log"
 19 	"net/http"
 20 	"strconv"
 21+	"strings"
 22 	"time"
 23 
 24 	"git.sr.ht/~gabrielgio/midr/db"
 25@@ -13,6 +15,13 @@
 26 type Env struct {
 27 	Entries db.EntryModel
 28 	Worker  worker.Worker
 29+}
 30+
 31+func logBytes(logc <-chan []byte) {
 32+	for l := range logc {
 33+		logs := strings.TrimRight(string(l), "\t \n")
 34+		log.Println(logs)
 35+	}
 36 }
 37 
 38 func (e *Env) GetEntries(c *gin.Context) {
 39@@ -41,7 +50,9 @@ func (e *Env) CreateEntry(c *gin.Context) {
 40 	var entry db.Entry
 41 	c.ShouldBind(&entry)
 42 	e.Entries.Create(&entry)
 43-	e.Worker.SpawnWorker(&entry)
 44+	log := e.Worker.RunYtDlpWorker(&entry)
 45+	go logBytes(log)
 46+
 47 	c.Redirect(http.StatusFound, "/")
 48 }
 49 
 50@@ -66,7 +77,10 @@ 		for {
 51 			entries := e.Entries.All()
 52 
 53 			for _, entry := range entries {
 54-				e.Worker.SpawnWorker(&entry)
 55+				log := e.Worker.RunYtDlpWorker(&entry)
 56+				if log != nil {
 57+					go logBytes(log)
 58+				}
 59 			}
 60 			time.Sleep(30 * time.Second)
 61 		}
 62diff --git a/worker/worker.go b/worker/worker.go
 63index 2444e892278f1547dd820ac6e664107b158acdb0..525a736f254e2fb9a5d2fa4c30d7f2f6de06f923 100644
 64--- a/worker/worker.go
 65+++ b/worker/worker.go
 66@@ -1,10 +1,13 @@
 67 package worker
 68 
 69 import (
 70+	"bufio"
 71+	"bytes"
 72 	"context"
 73+	"fmt"
 74+	"os/exec"
 75 
 76 	"git.sr.ht/~gabrielgio/midr/db"
 77-	"git.sr.ht/~gabrielgio/midr/yt"
 78 	work "git.sr.ht/~sircmpwn/dowork"
 79 )
 80 
 81@@ -40,6 +43,30 @@ 		jobs: make(map[uint]string),
 82 	}
 83 }
 84 
 85+func RunYtDlpProcess(entry *db.Entry) (error, []byte, []byte) {
 86+	args := []string{entry.Link}
 87+	var stdout bytes.Buffer
 88+	var stderr bytes.Buffer
 89+
 90+	output_template := fmt.Sprintf("%s/%%(title)s.%%(ext)s", entry.OutputFolder)
 91+	args = append(args, "-o", output_template)
 92+
 93+	downloaded_txt := fmt.Sprintf("%s/downloaded.txt", entry.OutputFolder)
 94+	args = append(args, "--download-archive", downloaded_txt)
 95+
 96+	if len(entry.DateAfter) > 0 {
 97+		args = append(args, "--dateafter", entry.DateAfter)
 98+	}
 99+
100+	cmd := exec.Command("yt-dlp", args...)
101+	cmd.Stdout = bufio.NewWriter(&stdout)
102+	cmd.Stderr = bufio.NewWriter(&stderr)
103+
104+	err := cmd.Run()
105+
106+	return err, stdout.Bytes(), stderr.Bytes()
107+}
108+
109 func (w *Worker) CanEnqueue(index uint) bool {
110 	v, found := w.jobs[index]
111 	return !found || v == statusNotQueued
112@@ -49,23 +76,31 @@ func (w *Worker) RemoveJob(id uint) {
113 	delete(w.jobs, id)
114 }
115 
116-func (w *Worker) SpawnWorker(entry *db.Entry) {
117-
118+func (w *Worker) RunYtDlpWorker(entry *db.Entry) <-chan []byte {
119 	if !w.CanEnqueue(entry.ID) {
120-		return
121+		return nil
122 	}
123+
124+	log := make(chan []byte)
125 
126 	w.c <- command{action: commandEnqueue, index: entry.ID}
127 	task := work.NewTask(func(ctx context.Context) error {
128 
129 		w.c <- command{action: commandStart, index: entry.ID}
130-		yt.RunYtDlpProcess(entry)
131-		return nil
132+		err, stdout, stderr := RunYtDlpProcess(entry)
133+
134+		log <- stdout
135+		log <- stderr
136+
137+		return err
138 	}).After(func(ctx context.Context, task *work.Task) {
139 		w.c <- command{action: commandDequeue, index: entry.ID}
140+		close(log)
141 	})
142 
143 	work.Enqueue(task)
144+
145+	return log
146 }
147 
148 func (w *Worker) startReader() {
149diff --git a/yt/manager.go b/yt/manager.go
150deleted file mode 100644
151index b9dc3336f203538ef073730e34f044a3eccdf409..0000000000000000000000000000000000000000
152--- a/yt/manager.go
153+++ /dev/null
154@@ -1,25 +0,0 @@
155-package yt
156-
157-import (
158-	"fmt"
159-	"os/exec"
160-
161-	"git.sr.ht/~gabrielgio/midr/db"
162-)
163-
164-func RunYtDlpProcess(entry *db.Entry) error {
165-	args := []string{entry.Link}
166-
167-	output_template := fmt.Sprintf("%s/%%(title)s.%%(ext)s", entry.OutputFolder)
168-	args = append(args, "-o", output_template)
169-
170-	downloaded_txt := fmt.Sprintf("%s/downloaded.txt", entry.OutputFolder)
171-	args = append(args, "--download-archive", downloaded_txt)
172-
173-	if len(entry.DateAfter) > 0 {
174-		args = append(args, "--dateafter", entry.DateAfter)
175-	}
176-
177-	cmd := exec.Command("yt-dlp", args...)
178-	return cmd.Run()
179-}