diff --git a/pkg/worker/http.go b/pkg/worker/http.go
index 1d56f86e2587ccf59a047e278e1a7a45f11cb165..973775e14ea0374073c0c59af15a908bdf1254e0 100644
--- a/pkg/worker/http.go
+++ b/pkg/worker/http.go
@@ -16,11 +16,23 @@ }
}
func (self *ServerTask) Start(ctx context.Context) error {
+ done := make(chan error)
+
go func() {
- // nolint: errcheck
- self.server.ListenAndServe()
+ done <- self.server.ListenAndServe()
}()
- <-ctx.Done()
- return self.server.Shutdown(ctx)
+ select {
+ // if ListenAndServe error for something other than context.Canceled
+ //(e.g.: address already in use) it trigger done to return sonner with
+ // the return error
+ case err := <-done:
+ return err
+
+ // in case of context canceled it will manually trigger the server to
+ // shutdown, and return its error, which is most cases, but not limited, is
+ // context.Canceled.
+ case <-ctx.Done():
+ return self.server.Shutdown(ctx)
+ }
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 6b5c21cd67bc557b2b8bd1fa048229dc4b098efb..fc97c97a78aa42a5453eef9b5451eeb7bb648956 100644
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -10,31 +10,42 @@ "golang.org/x/sync/errgroup"
)
type (
+
+ // Task defines a long running core component of the application.
Task interface {
+ // Start defines when the component will be started.
+ // The task MUST watch for the context and return when the context is
+ // canceled.
+ //
+ // Start MUST only error on unhandled catastrophic errors, since
+ // returning an error will cause the application to halt.
+ //
+ // Context canceled error is ignored and not reported as error as they
+ // will be trigger by OS signals or some other component erring. But in
+ // any case the task SHOULD handle and return context cancellation
+ // error.
Start(context.Context) error
}
- Work struct {
+ // TaskPool manages the life-cycle of a pool of tasks.
+ TaskPool struct {
+ tasks []*work
+ }
+
+ // work is wrapper around task to add metadata to it.
+ work struct {
Name string
Task Task
wait time.Duration
}
-
- TaskPool struct {
- tasks []*Work
- }
-)
-
-const (
- format = "2006.01.02 15:04:05"
)
func NewTaskPool() *TaskPool {
return &TaskPool{}
}
-func (w *Work) run(ctx context.Context) error {
- // first time fire from the get go
+func (w *work) run(ctx context.Context) error {
+ // first time fire the task from the get go
timer := time.NewTimer(time.Nanosecond)
for {
@@ -42,7 +53,7 @@ select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
- if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
+ if err := w.Task.Start(ctx); err != nil {
return err
}
}
@@ -51,7 +62,7 @@ }
}
func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) {
- self.tasks = append(self.tasks, &Work{
+ self.tasks = append(self.tasks, &work{
Name: name,
Task: task,
wait: wait,
@@ -60,27 +71,28 @@ }
func (self *TaskPool) Start(ctx context.Context) error {
var g errgroup.Group
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
for _, w := range self.tasks {
- g.Go(func(w *Work) func() error {
+ g.Go(func(w *work) func() error {
return func() error {
- slog.Info("Process starting", "time", time.Now().Format(format), "name", w.Name)
+ slog.Info("Process starting", "name", w.Name)
now := time.Now()
if err := w.run(ctx); err != nil && !errors.Is(context.Canceled, err) {
since := time.Since(now)
slog.Error(
"Process erred",
- "time", time.Now().Format(format),
"name", w.Name,
"error", err,
"duration", since,
)
+ cancel()
return err
} else {
since := time.Since(now)
slog.Info(
"Process ended",
- "time", time.Now().Format(format),
"name", w.Name,
"duration", since,
)