diff --git a/cmd/server/main.go b/cmd/server/main.go
index 473bed9c8f84206f027b3c505fb0260e04d5e627..0fa5fea9d42fe4a4c8ea1c67444e034b938f68e9 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -135,7 +135,6 @@ ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
pool.Start(ctx)
- pool.Wait()
}
func OpenDatabase(dbType string, dbConn string) (gorm.Dialector, error) {
diff --git a/pkg/coroutine/coroutine.go b/pkg/coroutine/coroutine.go
new file mode 100644
index 0000000000000000000000000000000000000000..96d149ea9851318b4ff9c0c7a0f07a1a6505eb83
--- /dev/null
+++ b/pkg/coroutine/coroutine.go
@@ -0,0 +1,33 @@
+package coroutine
+
+import (
+ "context"
+)
+
+// WrapProcess wraps process into a go routine and make it cancelable through context
+func WrapProcess[V any](ctx context.Context, fun func() (V, error)) (V, error) {
+ c := make(chan V)
+ e := make(chan error)
+ go func() {
+ defer close(c)
+ defer close(e)
+
+ v, err := fun()
+ if err != nil {
+ e <- err
+ } else {
+ c <- v
+ }
+ }()
+
+ select {
+ case <-ctx.Done():
+ var zero V
+ return zero, ctx.Err()
+ case m := <-c:
+ return m, nil
+ case err := <-e:
+ var zero V
+ return zero, err
+ }
+}
diff --git a/pkg/coroutine/coroutine_test.go b/pkg/coroutine/coroutine_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..e876ec3bd69b256426e2dad9fb6c2f1b17ab7348
--- /dev/null
+++ b/pkg/coroutine/coroutine_test.go
@@ -0,0 +1,63 @@
+//go:build unit
+
+package coroutine
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+ "time"
+
+ "git.sr.ht/~gabrielgio/img/pkg/testkit"
+)
+
+var rError = errors.New("This is a error")
+
+func imediatReturn() (string, error) {
+ return "A string", nil
+}
+
+func imediatErrorReturn() (string, error) {
+ return "", rError
+}
+
+func haltedReturn() (string, error) {
+ time.Sleep(time.Hour)
+ return "", nil
+}
+
+func TestImediatReturn(t *testing.T) {
+ ctx := context.Background()
+ v, err := WrapProcess(ctx, imediatReturn)
+ testkit.TestError(t, "WrapProcess", nil, err)
+ testkit.TestValue(t, "WrapProcess", "A string", v)
+}
+
+func TestImediatErrorReturn(t *testing.T) {
+ ctx := context.Background()
+ v, err := WrapProcess(ctx, imediatErrorReturn)
+ testkit.TestError(t, "WrapProcess", rError, err)
+ testkit.TestValue(t, "WrapProcess", "", v)
+}
+
+func TestHaltedReturn(t *testing.T) {
+ ctx := context.Background()
+ ctx, cancel := context.WithCancel(ctx)
+
+ var (
+ err error
+ wg sync.WaitGroup
+ )
+
+ wg.Add(1)
+ go func(err *error) {
+ defer wg.Done()
+ _, *err = WrapProcess(ctx, haltedReturn)
+ }(&err)
+
+ cancel()
+ wg.Wait()
+
+ testkit.TestError(t, "WrapProcess", context.Canceled, err)
+}
diff --git a/pkg/coroutines/coroutines.go b/pkg/coroutines/coroutines.go
deleted file mode 100644
index c0f7247cdf3d00e2b1fd4e019f5ec334fa32ea96..0000000000000000000000000000000000000000
--- a/pkg/coroutines/coroutines.go
+++ /dev/null
@@ -1 +0,0 @@
-package coroutines
diff --git a/pkg/worker/exif_scanner.go b/pkg/worker/exif_scanner.go
index 4aa247d832bb5261d0373c4fbb8bb8d90c73af02..91eed1270268368fa3f43216e087648984296d15 100644
--- a/pkg/worker/exif_scanner.go
+++ b/pkg/worker/exif_scanner.go
@@ -4,6 +4,7 @@ import (
"context"
"git.sr.ht/~gabrielgio/img/pkg/components/media"
+ "git.sr.ht/~gabrielgio/img/pkg/coroutine"
"git.sr.ht/~gabrielgio/img/pkg/fileop"
)
@@ -33,36 +34,11 @@
return medias, nil
}
-func wrapReadExif(ctx context.Context, path string) (*media.MediaEXIF, error) {
- c := make(chan *media.MediaEXIF)
- e := make(chan error)
- go func() {
- defer close(c)
- defer close(e)
-
- newExif, err := fileop.ReadExif(path)
- if err != nil {
- e <- err
- } else {
- c <- newExif
- }
- }()
-
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case m := <-c:
- return m, nil
- case err := <-e:
- return nil, err
- }
-}
-
func (e *EXIFScanner) Process(ctx context.Context, m *media.Media) error {
- newExif, err := wrapReadExif(ctx, m.Path)
+ exif, err := coroutine.WrapProcess(ctx, func() (*media.MediaEXIF, error) { return fileop.ReadExif(m.Path) })
if err != nil {
return err
}
- return e.repository.CreateEXIF(ctx, m.ID, newExif)
+ return e.repository.CreateEXIF(ctx, m.ID, exif)
}
diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go
index 1e4ed2d7793632a996cb6b828fd21677dcedbb55..35672f3b1e9b807fde0698ea752a7763777b7cfa 100644
--- a/pkg/worker/list_processor_test.go
+++ b/pkg/worker/list_processor_test.go
@@ -10,6 +10,7 @@ "sync"
"testing"
"git.sr.ht/~gabrielgio/img/pkg/testkit"
+ "github.com/sirupsen/logrus"
)
type (
@@ -24,10 +25,13 @@ }
)
func TestListProcessorLimit(t *testing.T) {
- mock := &mockCounterListProcessor{
- countTo: 10000,
- }
- worker := NewWorkerFromListProcessor[int](mock, nil)
+ var (
+ log = logrus.New()
+ scheduler = NewScheduler(1)
+ mock = &mockCounterListProcessor{countTo: 10000}
+ )
+
+ worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
err := worker.Start(context.Background())
testkit.TestFatalError(t, "Start", err)
@@ -36,8 +40,13 @@ testkit.TestValue(t, "Start", mock.countTo, mock.counter)
}
func TestListProcessorContextCancelQuery(t *testing.T) {
- mock := &mockContextListProcessor{}
- worker := NewWorkerFromListProcessor[int](mock, nil)
+ var (
+ log = logrus.New()
+ scheduler = NewScheduler(1)
+ mock = &mockContextListProcessor{}
+ )
+
+ worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 18cc0e25c15a57d778a806275f6fa354b637d8c0..359384a260713696d42fbbf02b6be54bcda9f6a6 100644
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -20,7 +20,6 @@ }
WorkerPool struct {
workers []*Work
- wg sync.WaitGroup
}
)
@@ -36,10 +35,13 @@ })
}
func (self *WorkerPool) Start(ctx context.Context) {
- self.wg.Add(len(self.workers))
+ var wg sync.WaitGroup
+
+ wg.Add(len(self.workers))
+
for _, w := range self.workers {
go func(w *Work) {
- defer self.wg.Done()
+ defer wg.Done()
if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
fmt.Println("Processes finished, error", w.Name, err.Error())
} else {
@@ -47,8 +49,6 @@ fmt.Println(w.Name, "done")
}
}(w)
}
-}
-func (self *WorkerPool) Wait() {
- self.wg.Wait()
+ wg.Wait()
}