lens @ d4e1ca3a48e74573df6965ceee217e119ff899ae

feat: Add scheduler to chan processor
  1diff --git a/cmd/server/main.go b/cmd/server/main.go
  2index d7c2fd64ce0eaf97d3afc4e3d3d6337ac5995109..f58366ff418564305b29238e67f5b6314b10e249 100644
  3--- a/cmd/server/main.go
  4+++ b/cmd/server/main.go
  5@@ -118,8 +118,12 @@
  6 	// worker
  7 	var (
  8 		serverWorker = worker.NewServerWorker(&fasthttp.Server{Handler: r.Handler})
  9-		fileWorker   = worker.NewWorkerFromChanProcessor[string](fileScanner, scheduler)
 10-		exifWorker   = worker.NewWorkerFromBatchProcessor[*media.Media](
 11+		fileWorker   = worker.NewWorkerFromChanProcessor[string](
 12+			fileScanner,
 13+			scheduler,
 14+			logrus.WithField("context", "file scanner"),
 15+		)
 16+		exifWorker = worker.NewWorkerFromBatchProcessor[*media.Media](
 17 			exifScanner,
 18 			scheduler,
 19 			logrus.WithField("context", "exif scanner"),
 20diff --git a/go.mod b/go.mod
 21index 4778b578de239d2f00372a222cb31347b74462d4..abd738844b4ff89064efa2533e7dd67cb6fffac9 100644
 22--- a/go.mod
 23+++ b/go.mod
 24@@ -5,7 +5,6 @@
 25 require (
 26 	github.com/barasher/go-exiftool v1.10.0
 27 	github.com/fasthttp/router v1.4.19
 28-	github.com/gabriel-vasile/mimetype v1.4.2
 29 	github.com/google/go-cmp v0.5.9
 30 	github.com/samber/lo v1.38.1
 31 	github.com/sirupsen/logrus v1.9.2
 32@@ -33,7 +32,6 @@ 	github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
 33 	github.com/valyala/bytebufferpool v1.0.0 // indirect
 34 	golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
 35 	golang.org/x/mod v0.10.0 // indirect
 36-	golang.org/x/net v0.10.0 // indirect
 37 	golang.org/x/sys v0.8.0 // indirect
 38 	golang.org/x/text v0.9.0 // indirect
 39 	golang.org/x/tools v0.9.3 // indirect
 40diff --git a/go.sum b/go.sum
 41index d0e670b90e662ccf832a0df939d7c15094452b93..ac79e0eafbc1fed60c5403a124dddc4329c49d58 100644
 42--- a/go.sum
 43+++ b/go.sum
 44@@ -7,8 +7,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 45 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 46 github.com/fasthttp/router v1.4.19 h1:RLE539IU/S4kfb4MP56zgP0TIBU9kEg0ID9GpWO0vqk=
 47 github.com/fasthttp/router v1.4.19/go.mod h1:+Fh3YOd8x1+he6ZS+d2iUDBH9MGGZ1xQFUor0DE9rKE=
 48-github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
 49-github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA=
 50 github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
 51 github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
 52 github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
 53@@ -58,8 +56,6 @@ golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
 54 golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
 55 golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk=
 56 golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
 57-golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
 58-golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
 59 golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
 60 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 61 golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
 62diff --git a/pkg/worker/file_scanner.go b/pkg/worker/file_scanner.go
 63index 0dc2eb2d4ec01556a88a96ac284a0cd695b68f93..fda869c156fa47a69f6166faebb6f17a3df20f2c 100644
 64--- a/pkg/worker/file_scanner.go
 65+++ b/pkg/worker/file_scanner.go
 66@@ -5,9 +5,9 @@ 	"context"
 67 	"crypto/md5"
 68 	"encoding/hex"
 69 	"io/fs"
 70+	"mime"
 71 	"path/filepath"
 72-
 73-	"github.com/gabriel-vasile/mimetype"
 74+	"strings"
 75 
 76 	"git.sr.ht/~gabrielgio/img/pkg/components/media"
 77 )
 78@@ -39,6 +39,10 @@ 				return filepath.SkipAll
 79 			default:
 80 			}
 81 
 82+			if info == nil {
 83+				return nil
 84+			}
 85+
 86 			if info.IsDir() && filepath.Base(info.Name())[0] == '.' {
 87 				return filepath.SkipDir
 88 			}
 89@@ -47,11 +51,6 @@ 			if info.IsDir() {
 90 				return nil
 91 			}
 92 
 93-			if filepath.Ext(info.Name()) != ".jpg" &&
 94-				filepath.Ext(info.Name()) != ".jpeg" &&
 95-				filepath.Ext(info.Name()) != ".png" {
 96-				return nil
 97-			}
 98 			c <- path
 99 			return nil
100 		})
101@@ -60,6 +59,11 @@ 	return c, nil
102 }
103 
104 func (f *FileScanner) Process(ctx context.Context, path string) error {
105+	m := mime.TypeByExtension(filepath.Ext(path))
106+	if !strings.HasPrefix(m, "video") && !strings.HasPrefix(m, "image") {
107+		return nil
108+	}
109+
110 	hash := md5.Sum([]byte(path))
111 	str := hex.EncodeToString(hash[:])
112 	name := filepath.Base(path)
113@@ -73,7 +77,6 @@ 	if exists {
114 		return nil
115 	}
116 
117-	mime, errResp := mimetype.DetectFile(path)
118 	if errResp != nil {
119 		return errResp
120 	}
121@@ -82,6 +85,6 @@ 	return f.repository.Create(ctx, &media.CreateMedia{
122 		Name:     name,
123 		Path:     path,
124 		PathHash: str,
125-		MIMEType: mime.String(),
126+		MIMEType: m,
127 	})
128 }
129diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
130index bbc9fb747c6192a137c133bb457b6a5771cb54c1..0a07085d58ddb936e87789eafd6929f3f433f893 100644
131--- a/pkg/worker/list_processor.go
132+++ b/pkg/worker/list_processor.go
133@@ -23,6 +23,7 @@ 	}
134 
135 	chanProcessorWorker[T any] struct {
136 		chanProcessor ChanProcessor[T]
137+		logrus        *logrus.Entry
138 		scheduler     *Scheduler
139 	}
140 
141@@ -48,10 +49,12 @@
142 func NewWorkerFromChanProcessor[T any](
143 	chanProcessor ChanProcessor[T],
144 	scheduler *Scheduler,
145+	logrus *logrus.Entry,
146 ) Worker {
147 	return &chanProcessorWorker[T]{
148 		chanProcessor: chanProcessor,
149 		scheduler:     scheduler,
150+		logrus:        logrus,
151 	}
152 }
153 
154@@ -104,9 +107,13 @@ 			if !ok {
155 				return nil
156 			}
157 
158-			if err := l.chanProcessor.Process(ctx, v); err != nil {
159-				return err
160-			}
161+			l.scheduler.Take()
162+			go func(v T) {
163+				defer l.scheduler.Return()
164+				if err := l.chanProcessor.Process(ctx, v); err != nil {
165+					l.logrus.WithError(err).Error("Error processing chan")
166+				}
167+			}(v)
168 		}
169 	}
170 }