lens @ b242ed3c44f4dde7c4b452312b78a3b02f42ea65

feat: Add task loop

Now the tasks will loop every given time. With this it will be able to
pick up new photos after the application was started.

I added 2h for file because my personal photo gallery is quite big and
quite IO bottled necked so it tasks a lot of time to go through.
  1diff --git a/cmd/server/main.go b/cmd/server/main.go
  2index daf5356263701e47f844e450c6a9d5d491bc7808..2a93946964173e1b7d9babb1529a2ca5b1fc5e11 100644
  3--- a/cmd/server/main.go
  4+++ b/cmd/server/main.go
  5@@ -7,6 +7,7 @@ 	"errors"
  6 	"net/http"
  7 	"os"
  8 	"os/signal"
  9+	"time"
 10 
 11 	"github.com/gorilla/mux"
 12 	"github.com/sirupsen/logrus"
 13@@ -122,37 +123,37 @@ 		thumbnailScanner = scanner.NewThumbnailScanner(*cachePath, mediaRepository)
 14 		albumScanner     = scanner.NewAlbumScanner(mediaRepository)
 15 	)
 16 
 17-	// worker
 18+	// tasks
 19 	var (
 20-		serverWorker = worker.NewServerWorker(&http.Server{Handler: r, Addr: "0.0.0.0:8080"})
 21-		fileWorker   = worker.NewWorkerFromChanProcessor[string](
 22+		serverTask = worker.NewServerTask(&http.Server{Handler: r, Addr: "0.0.0.0:8080"})
 23+		fileTask   = worker.NewTaskFromChanProcessor[string](
 24 			fileScanner,
 25 			scheduler,
 26 			logrus.WithField("context", "file scanner"),
 27 		)
 28-		exifWorker = worker.NewWorkerFromBatchProcessor[*repository.Media](
 29+		exifTask = worker.NewTaskFromBatchProcessor[*repository.Media](
 30 			exifScanner,
 31 			scheduler,
 32 			logrus.WithField("context", "exif scanner"),
 33 		)
 34-		thumbnailWorker = worker.NewWorkerFromBatchProcessor[*repository.Media](
 35+		thumbnailTask = worker.NewTaskFromBatchProcessor[*repository.Media](
 36 			thumbnailScanner,
 37 			scheduler,
 38 			logrus.WithField("context", "thumbnail scanner"),
 39 		)
 40-		albumWorker = worker.NewWorkerFromSerialProcessor[*repository.Media](
 41+		albumTask = worker.NewTaskFromSerialProcessor[*repository.Media](
 42 			albumScanner,
 43 			scheduler,
 44 			logrus.WithField("context", "thumbnail scanner"),
 45 		)
 46 	)
 47 
 48-	pool := worker.NewWorkerPool()
 49-	pool.AddWorker("http server", serverWorker)
 50-	pool.AddWorker("exif scanner", exifWorker)
 51-	pool.AddWorker("file scanner", fileWorker)
 52-	pool.AddWorker("thumbnail scanner", thumbnailWorker)
 53-	pool.AddWorker("album scanner", albumWorker)
 54+	pool := worker.NewTaskPool()
 55+	pool.AddTask("http server", time.Minute, serverTask)
 56+	pool.AddTask("exif scanner", 15*time.Minute, exifTask)
 57+	pool.AddTask("file scanner", 2*time.Hour, fileTask)
 58+	pool.AddTask("thumbnail scanner", 15*time.Minute, thumbnailTask)
 59+	pool.AddTask("album scanner", 15*time.Minute, albumTask)
 60 
 61 	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
 62 	defer stop()
 63diff --git a/pkg/worker/httpserver.go b/pkg/worker/httpserver.go
 64index dc8f255f1953dfec7011e4611dd54ddfb3d22cfa..f0ec3ba4ce3a4befe97250e14c64b0d986366308 100644
 65--- a/pkg/worker/httpserver.go
 66+++ b/pkg/worker/httpserver.go
 67@@ -5,11 +5,11 @@ 	"context"
 68 	"net/http"
 69 )
 70 
 71-type ServerWorker struct {
 72+type ServerTask struct {
 73 	server *http.Server
 74 }
 75 
 76-func (self *ServerWorker) Start(ctx context.Context) error {
 77+func (self *ServerTask) Start(ctx context.Context) error {
 78 	go func() {
 79 		// nolint: errcheck
 80 		self.server.ListenAndServe()
 81@@ -19,8 +19,8 @@ 	<-ctx.Done()
 82 	return self.server.Shutdown(ctx)
 83 }
 84 
 85-func NewServerWorker(server *http.Server) *ServerWorker {
 86-	return &ServerWorker{
 87+func NewServerTask(server *http.Server) *ServerTask {
 88+	return &ServerTask{
 89 		server: server,
 90 	}
 91 }
 92diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
 93index 02817c94e8074b392580606b842fb25c0c679d2d..ea6b45372e607368892aeb2d038c25a09d65e437 100644
 94--- a/pkg/worker/list_processor.go
 95+++ b/pkg/worker/list_processor.go
 96@@ -10,7 +10,7 @@ )
 97 
 98 type (
 99 
100-	// A simple worker to deal with list.
101+	// A simple task to deal with list.
102 	ChanProcessor[T any] interface {
103 		Query(context.Context) (<-chan T, error)
104 		Process(context.Context, T) error
105@@ -25,62 +25,62 @@ 		Query(context.Context) ([]T, error)
106 		Process(context.Context, T) error
107 	}
108 
109-	chanProcessorWorker[T any] struct {
110+	chanProcessorTask[T any] struct {
111 		chanProcessor ChanProcessor[T]
112 		logrus        *logrus.Entry
113 		scheduler     *Scheduler
114 	}
115 
116-	batchProcessorWorker[T any] struct {
117+	batchProcessorTask[T any] struct {
118 		batchProcessor ListProcessor[T]
119 		logrus         *logrus.Entry
120 		scheduler      *Scheduler
121 	}
122 
123-	serialProcessorWorker[T any] struct {
124+	serialProcessorTask[T any] struct {
125 		batchProcessor ListProcessor[T]
126 		logrus         *logrus.Entry
127 		scheduler      *Scheduler
128 	}
129 )
130 
131-func NewWorkerFromBatchProcessor[T any](
132+func NewTaskFromBatchProcessor[T any](
133 	batchProcessor ListProcessor[T],
134 	scheduler *Scheduler,
135 	logrus *logrus.Entry,
136-) Worker {
137-	return &batchProcessorWorker[T]{
138+) Task {
139+	return &batchProcessorTask[T]{
140 		batchProcessor: batchProcessor,
141 		scheduler:      scheduler,
142 		logrus:         logrus,
143 	}
144 }
145 
146-func NewWorkerFromSerialProcessor[T any](
147+func NewTaskFromSerialProcessor[T any](
148 	batchProcessor ListProcessor[T],
149 	scheduler *Scheduler,
150 	logrus *logrus.Entry,
151-) Worker {
152-	return &serialProcessorWorker[T]{
153+) Task {
154+	return &serialProcessorTask[T]{
155 		batchProcessor: batchProcessor,
156 		scheduler:      scheduler,
157 		logrus:         logrus,
158 	}
159 }
160 
161-func NewWorkerFromChanProcessor[T any](
162+func NewTaskFromChanProcessor[T any](
163 	chanProcessor ChanProcessor[T],
164 	scheduler *Scheduler,
165 	logrus *logrus.Entry,
166-) Worker {
167-	return &chanProcessorWorker[T]{
168+) Task {
169+	return &chanProcessorTask[T]{
170 		chanProcessor: chanProcessor,
171 		scheduler:     scheduler,
172 		logrus:        logrus,
173 	}
174 }
175 
176-func (l *batchProcessorWorker[T]) Start(ctx context.Context) error {
177+func (l *batchProcessorTask[T]) Start(ctx context.Context) error {
178 	for {
179 		values, err := l.batchProcessor.Query(ctx)
180 		if err != nil {
181@@ -123,7 +123,7 @@ 		wg.Wait()
182 	}
183 }
184 
185-func (l *serialProcessorWorker[T]) Start(ctx context.Context) error {
186+func (l *serialProcessorTask[T]) Start(ctx context.Context) error {
187 	for {
188 		values, err := l.batchProcessor.Query(ctx)
189 		if err != nil {
190@@ -158,7 +158,7 @@ 		}
191 	}
192 }
193 
194-func (l *chanProcessorWorker[T]) Start(ctx context.Context) error {
195+func (l *chanProcessorTask[T]) Start(ctx context.Context) error {
196 	c, err := l.chanProcessor.Query(ctx)
197 	if err != nil {
198 		return err
199diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go
200index ce3ff0a3d531d278ac4afa5157d7fd8c3149acf8..abdb90792f9713c640180c778f8db69e9abfe388 100644
201--- a/pkg/worker/list_processor_test.go
202+++ b/pkg/worker/list_processor_test.go
203@@ -32,7 +32,7 @@ 		scheduler = NewScheduler(1)
204 		mock      = &mockCounterListProcessor{countTo: 10000}
205 	)
206 
207-	worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
208+	worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
209 
210 	err := worker.Start(context.Background())
211 	testkit.TestFatalError(t, "Start", err)
212@@ -47,7 +47,7 @@ 		scheduler = NewScheduler(1)
213 		mock      = &mockContextListProcessor{}
214 	)
215 
216-	worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
217+	worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
218 
219 	ctx, cancel := context.WithCancel(context.Background())
220 	var wg sync.WaitGroup
221diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
222index 359384a260713696d42fbbf02b6be54bcda9f6a6..b768320465871193d203467657b48372e7d2a0ba 100644
223--- a/pkg/worker/worker.go
224+++ b/pkg/worker/worker.go
225@@ -5,48 +5,68 @@ 	"context"
226 	"errors"
227 	"fmt"
228 	"sync"
229+	"time"
230 )
231 
232 type (
233-	// Worker should watch for context
234-	Worker interface {
235+	// Task should watch for context
236+	Task interface {
237 		Start(context.Context) error
238 	}
239 
240 	Work struct {
241-		Name   string
242-		Worker Worker
243+		Name string
244+		Task Task
245+		wait time.Duration
246 	}
247 
248-	WorkerPool struct {
249-		workers []*Work
250+	TaskPool struct {
251+		tasks []*Work
252 	}
253 )
254 
255-func NewWorkerPool() *WorkerPool {
256-	return &WorkerPool{}
257+func NewTaskPool() *TaskPool {
258+	return &TaskPool{}
259 }
260 
261-func (self *WorkerPool) AddWorker(name string, worker Worker) {
262-	self.workers = append(self.workers, &Work{
263-		Name:   name,
264-		Worker: worker,
265+func (w *Work) run(ctx context.Context) error {
266+	// first time fire from the get go
267+	timer := time.NewTimer(time.Nanosecond)
268+
269+	for {
270+		select {
271+		case <-ctx.Done():
272+			return ctx.Err()
273+		case <-timer.C:
274+			fmt.Println("Process starting: ", w.Name)
275+			if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
276+				fmt.Println("Process errored: ", w.Name, err.Error())
277+				return err
278+			} else {
279+				fmt.Println("Process done: ", w.Name)
280+			}
281+		}
282+		timer.Reset(w.wait)
283+	}
284+}
285+
286+func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
287+	self.tasks = append(self.tasks, &Work{
288+		Name: name,
289+		Task: task,
290+		wait: wait,
291 	})
292 }
293 
294-func (self *WorkerPool) Start(ctx context.Context) {
295+func (self *TaskPool) Start(ctx context.Context) {
296 	var wg sync.WaitGroup
297 
298-	wg.Add(len(self.workers))
299+	wg.Add(len(self.tasks))
300 
301-	for _, w := range self.workers {
302+	for _, w := range self.tasks {
303 		go func(w *Work) {
304-			defer wg.Done()
305-			if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
306-				fmt.Println("Processes finished, error", w.Name, err.Error())
307-			} else {
308-				fmt.Println(w.Name, "done")
309-			}
310+			_ = w.run(ctx)
311+			wg.Done()
312 		}(w)
313 	}
314 
315diff --git a/templates/base.qtpl b/templates/base.qtpl
316index b1878ba8f436c065ca17b9e31cebe36f7dcee0be..a80803a50b720b01160069c224084f74ea86fe8b 100644
317--- a/templates/base.qtpl
318+++ b/templates/base.qtpl
319@@ -47,7 +47,7 @@                     settings
320                 </a>
321             </div>
322         </nav>
323-        <div class="container">
324+        <div class="container is-fullhd">
325             {%= p.Content() %}
326         </div>
327     </body>