diff --git a/splitio/producer/task/pipelined.go b/splitio/producer/task/pipelined.go index 3a54f5ac..cdfc2fbd 100644 --- a/splitio/producer/task/pipelined.go +++ b/splitio/producer/task/pipelined.go @@ -255,23 +255,20 @@ func (p *PipelinedSyncTask) sinker() { defer asRecyblable.recycle() } - common.WithAttempts(3, func() error { + err := common.WithAttempts(3, func() error { p.logger.Debug(fmt.Sprintf("[pipelined/%s] - impressions post ready. making request", p.name)) req, err := p.worker.BuildRequest(bulk) if err != nil { - p.logger.Error(fmt.Sprintf("[pipelined/%s] error building request: %s", p.name, err)) - return err + return fmt.Errorf(fmt.Sprintf("[pipelined/%s] error building request: %s", p.name, err)) } resp, err := p.httpClient.Do(req) if err != nil { - p.logger.Error(fmt.Sprintf("[pipelined/%s] error posting: %s", p.name, err)) - return err + return fmt.Errorf(fmt.Sprintf("[pipelined/%s] error posting: %s", p.name, err)) } if resp.StatusCode < 200 || resp.StatusCode >= 300 { - p.logger.Error(fmt.Sprintf("[pipelined/%s] bad status code when sinking data: %d", p.name, resp.StatusCode)) - return errHTTP + return fmt.Errorf(fmt.Sprintf("[pipelined/%s] bad status code when sinking data: %d", p.name, resp.StatusCode)) } if resp.Body != nil { @@ -280,6 +277,9 @@ func (p *PipelinedSyncTask) sinker() { p.logger.Debug(fmt.Sprintf("[pipelined/%s] - impressions posted successfully", p.name)) return nil }) + if err != nil { + p.logger.Error(err) + } }() } }