Skip to content

Commit

Permalink
logging: adjust log levels
Browse files Browse the repository at this point in the history
  • Loading branch information
colindickson committed Jan 19, 2024
1 parent 7de579a commit 21d3604
Show file tree
Hide file tree
Showing 11 changed files with 21 additions and 25 deletions.
3 changes: 0 additions & 3 deletions orchestrator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,6 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd {

}

//if len(cmds) != 0 {
// fmt.Printf("Schedule: %T %+v\n", cmds, cmds)
//}
return loop.Batch(cmds...)
}

Expand Down
10 changes: 5 additions & 5 deletions orchestrator/work/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ func (w *RemoteWorker) Work(ctx context.Context, unit stage.Unit, workRange *blo
if errors.Is(err, context.Canceled) {
logger.Debug("job canceled", zap.Object("unit", unit), zap.Error(err))
} else {
logger.Info("job failed", zap.Object("unit", unit), zap.Error(err))
logger.Warn("job failed", zap.Object("unit", unit), zap.Error(err))
}

timeTook := time.Since(startTime)
logger.Info(
logger.Warn(
"incomplete job",
zap.Object("unit", unit),
zap.Int("number_of_tries", retryIdx),
Expand All @@ -137,12 +137,12 @@ func (w *RemoteWorker) Work(ctx context.Context, unit stage.Unit, workRange *blo
}

if err := ctx.Err(); err != nil {
logger.Info("job not completed", zap.Object("unit", unit), zap.Error(err))
logger.Warn("job not completed", zap.Object("unit", unit), zap.Error(err))
return MsgJobFailed{Unit: unit, Error: err}
}

timeTook := time.Since(startTime)
logger.Info(
logger.Debug(
"job completed",
zap.Object("unit", unit),
zap.Int("number_of_tries", retryIdx),
Expand Down Expand Up @@ -249,7 +249,7 @@ func (w *RemoteWorker) work(ctx context.Context, request *pbssinternal.ProcessRa
return &Result{Error: err}

case *pbssinternal.ProcessRangeResponse_Completed:
logger.Info("worker done")
logger.Debug("worker done")
return &Result{
PartialFilesWritten: toRPCPartialFiles(r.Completed),
}
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/work/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type WorkerStatus struct {
func NewWorkerPool(ctx context.Context, workerCount int, workerFactory WorkerFactory) *WorkerPool {
logger := reqctx.Logger(ctx)

logger.Info("initializing worker pool", zap.Int("worker_count", workerCount))
logger.Debug("initializing worker pool", zap.Int("worker_count", workerCount))

workers := make([]*WorkerStatus, workerCount)
for i := 0; i < workerCount; i++ {
Expand Down
4 changes: 2 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (p *Pipeline) InitTier2Stores(ctx context.Context) (err error) {
p.stores.SetStoreMap(storeMap)

logger := reqctx.Logger(ctx)
logger.Info("stores loaded", zap.Object("stores", p.stores.StoreMap), zap.Int("stage", reqctx.Details(ctx).Tier2Stage))
logger.Debug("stores loaded", zap.Object("stores", p.stores.StoreMap), zap.Int("stage", reqctx.Details(ctx).Tier2Stage))

return nil
}
Expand Down Expand Up @@ -290,7 +290,7 @@ func (p *Pipeline) runParallelProcess(ctx context.Context, reqPlan *plan.Request
}
}()

logger.Info("starting parallel processing")
logger.Debug("starting parallel processing")

storeMap, err = parallelProcessor.Run(ctx)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion pipeline/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func (p *Pipeline) ProcessBlock(block *pbbstream.Block, obj interface{}) (err er
if r := recover(); r != nil {
if err, ok := r.(error); ok {
if errors.Is(err, context.Canceled) {
logger.Info("context canceled")
return
}
}
Expand Down
8 changes: 4 additions & 4 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,19 +270,19 @@ func (s *Tier1Service) Blocks(
if connectError := toConnectError(runningContext, err); connectError != nil {
switch connect.CodeOf(connectError) {
case connect.CodeInternal:
logger.Info("unexpected termination of stream of blocks", zap.String("stream_processor", "tier1"), zap.Error(err))
logger.Warn("unexpected termination of stream of blocks", zap.String("stream_processor", "tier1"), zap.Error(err))
case connect.CodeInvalidArgument:
logger.Debug("recording failure on request", zap.String("request_id", requestID))
s.recordFailure(requestID, connectError)
case connect.CodeCanceled:
logger.Info("Blocks request canceled by user", zap.Error(connectError))
default:
logger.Info("Blocks request completed with error", zap.Error(connectError))
logger.Warn("Blocks request completed with error", zap.Error(connectError))
}
return connectError
}

logger.Info("Blocks request completed witout error")
logger.Debug("Blocks request completed witout error")
return nil
}

Expand Down Expand Up @@ -452,7 +452,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
return fmt.Errorf("error building request plan: %w", err)
}

logger.Info("initializing tier1 pipeline",
logger.Debug("initializing tier1 pipeline",
zap.Stringer("plan", reqPlan),
zap.Int64("request_start_block", request.StartBlockNum),
zap.Uint64("resolved_start_block", requestDetails.ResolvedStartBlockNum),
Expand Down
4 changes: 2 additions & 2 deletions service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func NewTier2(
return nil, fmt.Errorf("getting block type from merged-blocks-store: %w", err)
}

logger.Info("launching tier2 service", zap.String("block_type", blockType))
logger.Debug("launching tier2 service", zap.String("block_type", blockType))
s := &Tier2Service{
runtimeConfig: runtimeConfig,
blockType: blockType,
Expand Down Expand Up @@ -278,7 +278,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P
opts...,
)

logger.Info("initializing tier2 pipeline",
logger.Debug("initializing tier2 pipeline",
zap.Uint64("request_start_block", requestDetails.ResolvedStartBlockNum),
zap.Uint64("request_stop_block", request.StopBlockNum),
zap.String("output_module", request.OutputModule),
Expand Down
8 changes: 4 additions & 4 deletions sink-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func New(

// this is a blocking call
func (s *server) Run(ctx context.Context) {
s.logger.Info("starting server server")
s.logger.Debug("starting server")

options := []dgrpcserver.Option{
dgrpcserver.WithLogger(s.logger),
Expand All @@ -71,7 +71,7 @@ func (s *server) Run(ctx context.Context) {
dgrpcserver.WithCORS(s.corsOption()),
}
if strings.Contains(s.httpListenAddr, "*") {
s.logger.Info("grpc server with insecure server")
s.logger.Warn("grpc server with insecure server")
options = append(options, dgrpcserver.WithInsecureServer())
} else {
s.logger.Info("grpc server with plain text server")
Expand All @@ -87,7 +87,7 @@ func (s *server) Run(ctx context.Context) {

s.OnTerminating(func(err error) {
s.shutdownLock.Lock()
s.logger.Info("shutting down connect web server")
s.logger.Warn("shutting down connect web server")

shutdownErr := s.engine.Shutdown(ctx, err, s.logger)
if shutdownErr != nil {
Expand All @@ -97,7 +97,7 @@ func (s *server) Run(ctx context.Context) {
time.Sleep(1 * time.Second)

srv.Shutdown(nil)
s.logger.Info("connect web server shutdown")
s.logger.Warn("connect web server shutdown")
})

s.OnTerminated(func(err error) {
Expand Down
2 changes: 1 addition & 1 deletion storage/store/full_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *FullKV) Save(endBoundaryBlock uint64) (*FileInfo, *fileWriter, error) {

file := NewCompleteFileInfo(s.name, s.moduleInitialBlock, endBoundaryBlock)

s.logger.Info("saving store",
s.logger.Debug("saving store",
zap.String("file_name", file.Filename),
zap.Object("block_range", file.Range),
)
Expand Down
2 changes: 1 addition & 1 deletion storage/store/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (b *baseStore) Merge(kvPartialStore *PartialKV) error {
b.DeletePrefix(kvPartialStore.lastOrdinal, prefix)
}
if len(kvPartialStore.DeletedPrefixes) > 0 {
b.logger.Info("merging: applied delete prefixes", zap.Duration("duration", time.Since(partialKvTime)))
b.logger.Debug("merging: applied delete prefixes", zap.Duration("duration", time.Since(partialKvTime)))
}

intoValueTypeLower := strings.ToLower(b.valueType)
Expand Down
2 changes: 1 addition & 1 deletion storage/store/partial_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *PartialKV) Save(endBoundaryBlock uint64) (*FileInfo, *fileWriter, error
}

file := NewPartialFileInfo(p.name, p.initialBlock, endBoundaryBlock, p.traceID)
p.logger.Info("partial store save written", zap.String("file_name", file.Filename), zap.Stringer("block_range", file.Range))
p.logger.Debug("partial store save written", zap.String("file_name", file.Filename), zap.Stringer("block_range", file.Range))

fw := &fileWriter{
store: p.objStore,
Expand Down

0 comments on commit 21d3604

Please sign in to comment.