From 21d36045a02d92ca0cd999812d66520400b4e855 Mon Sep 17 00:00:00 2001 From: colindickson Date: Fri, 19 Jan 2024 08:02:01 -0500 Subject: [PATCH] logging: adjust log levels --- orchestrator/scheduler/scheduler.go | 3 --- orchestrator/work/worker.go | 10 +++++----- orchestrator/work/workerpool.go | 2 +- pipeline/pipeline.go | 4 ++-- pipeline/process_block.go | 1 - service/tier1.go | 8 ++++---- service/tier2.go | 4 ++-- sink-server/server.go | 8 ++++---- storage/store/full_kv.go | 2 +- storage/store/merge.go | 2 +- storage/store/partial_kv.go | 2 +- 11 files changed, 21 insertions(+), 25 deletions(-) diff --git a/orchestrator/scheduler/scheduler.go b/orchestrator/scheduler/scheduler.go index 0deb892b1..f5fc506c1 100644 --- a/orchestrator/scheduler/scheduler.go +++ b/orchestrator/scheduler/scheduler.go @@ -157,9 +157,6 @@ func (s *Scheduler) Update(msg loop.Msg) loop.Cmd { } - //if len(cmds) != 0 { - // fmt.Printf("Schedule: %T %+v\n", cmds, cmds) - //} return loop.Batch(cmds...) } diff --git a/orchestrator/work/worker.go b/orchestrator/work/worker.go index 3387f539e..eee27b9f8 100644 --- a/orchestrator/work/worker.go +++ b/orchestrator/work/worker.go @@ -121,11 +121,11 @@ func (w *RemoteWorker) Work(ctx context.Context, unit stage.Unit, workRange *blo if errors.Is(err, context.Canceled) { logger.Debug("job canceled", zap.Object("unit", unit), zap.Error(err)) } else { - logger.Info("job failed", zap.Object("unit", unit), zap.Error(err)) + logger.Warn("job failed", zap.Object("unit", unit), zap.Error(err)) } timeTook := time.Since(startTime) - logger.Info( + logger.Warn( "incomplete job", zap.Object("unit", unit), zap.Int("number_of_tries", retryIdx), @@ -137,12 +137,12 @@ func (w *RemoteWorker) Work(ctx context.Context, unit stage.Unit, workRange *blo } if err := ctx.Err(); err != nil { - logger.Info("job not completed", zap.Object("unit", unit), zap.Error(err)) + logger.Warn("job not completed", zap.Object("unit", unit), zap.Error(err)) return MsgJobFailed{Unit: unit, Error: err} } timeTook := time.Since(startTime) - logger.Info( + logger.Debug( "job completed", zap.Object("unit", unit), zap.Int("number_of_tries", retryIdx), @@ -249,7 +249,7 @@ func (w *RemoteWorker) work(ctx context.Context, request *pbssinternal.ProcessRa return &Result{Error: err} case *pbssinternal.ProcessRangeResponse_Completed: - logger.Info("worker done") + logger.Debug("worker done") return &Result{ PartialFilesWritten: toRPCPartialFiles(r.Completed), } diff --git a/orchestrator/work/workerpool.go b/orchestrator/work/workerpool.go index 2575fc94d..1afd8887d 100644 --- a/orchestrator/work/workerpool.go +++ b/orchestrator/work/workerpool.go @@ -30,7 +30,7 @@ type WorkerStatus struct { func NewWorkerPool(ctx context.Context, workerCount int, workerFactory WorkerFactory) *WorkerPool { logger := reqctx.Logger(ctx) - logger.Info("initializing worker pool", zap.Int("worker_count", workerCount)) + logger.Debug("initializing worker pool", zap.Int("worker_count", workerCount)) workers := make([]*WorkerStatus, workerCount) for i := 0; i < workerCount; i++ { diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index f4d25abcc..18d8ddf9d 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -148,7 +148,7 @@ func (p *Pipeline) InitTier2Stores(ctx context.Context) (err error) { p.stores.SetStoreMap(storeMap) logger := reqctx.Logger(ctx) - logger.Info("stores loaded", zap.Object("stores", p.stores.StoreMap), zap.Int("stage", reqctx.Details(ctx).Tier2Stage)) + logger.Debug("stores loaded", zap.Object("stores", p.stores.StoreMap), zap.Int("stage", reqctx.Details(ctx).Tier2Stage)) return nil } @@ -290,7 +290,7 @@ func (p *Pipeline) runParallelProcess(ctx context.Context, reqPlan *plan.Request } }() - logger.Info("starting parallel processing") + logger.Debug("starting parallel processing") storeMap, err = parallelProcessor.Run(ctx) if err != nil { diff --git a/pipeline/process_block.go b/pipeline/process_block.go index 10ce8e483..b382bd5f6 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -32,7 +32,6 @@ func (p *Pipeline) ProcessBlock(block *pbbstream.Block, obj interface{}) (err er if r := recover(); r != nil { if err, ok := r.(error); ok { if errors.Is(err, context.Canceled) { - logger.Info("context canceled") return } } diff --git a/service/tier1.go b/service/tier1.go index 352ba859f..c72994aac 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -270,19 +270,19 @@ func (s *Tier1Service) Blocks( if connectError := toConnectError(runningContext, err); connectError != nil { switch connect.CodeOf(connectError) { case connect.CodeInternal: - logger.Info("unexpected termination of stream of blocks", zap.String("stream_processor", "tier1"), zap.Error(err)) + logger.Warn("unexpected termination of stream of blocks", zap.String("stream_processor", "tier1"), zap.Error(err)) case connect.CodeInvalidArgument: logger.Debug("recording failure on request", zap.String("request_id", requestID)) s.recordFailure(requestID, connectError) case connect.CodeCanceled: logger.Info("Blocks request canceled by user", zap.Error(connectError)) default: - logger.Info("Blocks request completed with error", zap.Error(connectError)) + logger.Warn("Blocks request completed with error", zap.Error(connectError)) } return connectError } - logger.Info("Blocks request completed witout error") + logger.Debug("Blocks request completed witout error") return nil } @@ -452,7 +452,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ return fmt.Errorf("error building request plan: %w", err) } - logger.Info("initializing tier1 pipeline", + logger.Debug("initializing tier1 pipeline", zap.Stringer("plan", reqPlan), zap.Int64("request_start_block", request.StartBlockNum), zap.Uint64("resolved_start_block", requestDetails.ResolvedStartBlockNum), diff --git a/service/tier2.go b/service/tier2.go index 82c6df637..c5529bf65 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -92,7 +92,7 @@ func NewTier2( return nil, fmt.Errorf("getting block type from merged-blocks-store: %w", err) } - logger.Info("launching tier2 service", zap.String("block_type", blockType)) + logger.Debug("launching tier2 service", zap.String("block_type", blockType)) s := &Tier2Service{ runtimeConfig: runtimeConfig, blockType: blockType, @@ -278,7 +278,7 @@ func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.P opts..., ) - logger.Info("initializing tier2 pipeline", + logger.Debug("initializing tier2 pipeline", zap.Uint64("request_start_block", requestDetails.ResolvedStartBlockNum), zap.Uint64("request_stop_block", request.StopBlockNum), zap.String("output_module", request.OutputModule), diff --git a/sink-server/server.go b/sink-server/server.go index 3b34761fc..b44a5cb0d 100644 --- a/sink-server/server.go +++ b/sink-server/server.go @@ -61,7 +61,7 @@ func New( // this is a blocking call func (s *server) Run(ctx context.Context) { - s.logger.Info("starting server server") + s.logger.Debug("starting server") options := []dgrpcserver.Option{ dgrpcserver.WithLogger(s.logger), @@ -71,7 +71,7 @@ func (s *server) Run(ctx context.Context) { dgrpcserver.WithCORS(s.corsOption()), } if strings.Contains(s.httpListenAddr, "*") { - s.logger.Info("grpc server with insecure server") + s.logger.Warn("grpc server with insecure server") options = append(options, dgrpcserver.WithInsecureServer()) } else { s.logger.Info("grpc server with plain text server") @@ -87,7 +87,7 @@ func (s *server) Run(ctx context.Context) { s.OnTerminating(func(err error) { s.shutdownLock.Lock() - s.logger.Info("shutting down connect web server") + s.logger.Warn("shutting down connect web server") shutdownErr := s.engine.Shutdown(ctx, err, s.logger) if shutdownErr != nil { @@ -97,7 +97,7 @@ func (s *server) Run(ctx context.Context) { time.Sleep(1 * time.Second) srv.Shutdown(nil) - s.logger.Info("connect web server shutdown") + s.logger.Warn("connect web server shutdown") }) s.OnTerminated(func(err error) { diff --git a/storage/store/full_kv.go b/storage/store/full_kv.go index 6235a93f0..ce0014b80 100644 --- a/storage/store/full_kv.go +++ b/storage/store/full_kv.go @@ -76,7 +76,7 @@ func (s *FullKV) Save(endBoundaryBlock uint64) (*FileInfo, *fileWriter, error) { file := NewCompleteFileInfo(s.name, s.moduleInitialBlock, endBoundaryBlock) - s.logger.Info("saving store", + s.logger.Debug("saving store", zap.String("file_name", file.Filename), zap.Object("block_range", file.Range), ) diff --git a/storage/store/merge.go b/storage/store/merge.go index 4d5031037..f87880937 100644 --- a/storage/store/merge.go +++ b/storage/store/merge.go @@ -46,7 +46,7 @@ func (b *baseStore) Merge(kvPartialStore *PartialKV) error { b.DeletePrefix(kvPartialStore.lastOrdinal, prefix) } if len(kvPartialStore.DeletedPrefixes) > 0 { - b.logger.Info("merging: applied delete prefixes", zap.Duration("duration", time.Since(partialKvTime))) + b.logger.Debug("merging: applied delete prefixes", zap.Duration("duration", time.Since(partialKvTime))) } intoValueTypeLower := strings.ToLower(b.valueType) diff --git a/storage/store/partial_kv.go b/storage/store/partial_kv.go index c4ecf1a64..758ae3433 100644 --- a/storage/store/partial_kv.go +++ b/storage/store/partial_kv.go @@ -66,7 +66,7 @@ func (p *PartialKV) Save(endBoundaryBlock uint64) (*FileInfo, *fileWriter, error } file := NewPartialFileInfo(p.name, p.initialBlock, endBoundaryBlock, p.traceID) - p.logger.Info("partial store save written", zap.String("file_name", file.Filename), zap.Stringer("block_range", file.Range)) + p.logger.Debug("partial store save written", zap.String("file_name", file.Filename), zap.Stringer("block_range", file.Range)) fw := &fileWriter{ store: p.objStore,