From a70751bc0c7e8e0c7674b9cf04649b03f2620d01 Mon Sep 17 00:00:00 2001 From: mhuisi Date: Thu, 30 Nov 2023 10:58:21 +0100 Subject: [PATCH] refactor: enforce column limit in watchdog --- src/Lean/Server/Watchdog.lean | 185 +++++++++++++++++++++------------- 1 file changed, 116 insertions(+), 69 deletions(-) diff --git a/src/Lean/Server/Watchdog.lean b/src/Lean/Server/Watchdog.lean index f09f51d3463c..e8e7edb88c7f 100644 --- a/src/Lean/Server/Watchdog.lean +++ b/src/Lean/Server/Watchdog.lean @@ -22,42 +22,49 @@ For general server architecture, see `README.md`. This module implements the wat ## Watchdog state -Most LSP clients only send us file diffs, so to facilitate sending entire file contents to freshly restarted -workers, the watchdog needs to maintain the current state of each file. It can also use this state to detect changes -to the header and thus restart the corresponding worker, freeing its imports. +Most LSP clients only send us file diffs, so to facilitate sending entire file contents to freshly +restarted workers, the watchdog needs to maintain the current state of each file. It can also use +this state to detect changes to the header and thus restart the corresponding worker, freeing its +imports. TODO(WN): -We may eventually want to keep track of approximately (since this isn't knowable exactly) where in the file a worker -crashed. Then on restart, we tell said worker to only parse up to that point and query the user about how to proceed -(continue OR allow the user to fix the bug and then continue OR ..). Without this, if the crash is deterministic, -users may be confused about why the server seemingly stopped working for a single file. +We may eventually want to keep track of approximately (since this isn't knowable exactly) where in +the file a worker crashed. Then on restart, we tell said worker to only parse up to that point and +query the user about how to proceed (continue OR allow the user to fix the bug and then continue OR +..). Without this, if the crash is deterministic, users may be confused about why the server +seemingly stopped working for a single file. ## Watchdog <-> worker communication -The watchdog process and its file worker processes communicate via LSP. If the necessity arises, -we might add non-standard commands similarly based on JSON-RPC. Most requests and notifications -are forwarded to the corresponding file worker process, with the exception of these notifications: +The watchdog process and its file worker processes communicate via LSP. If the necessity arises, we +might add non-standard commands similarly based on JSON-RPC. Most requests and notifications are +forwarded to the corresponding file worker process, with the exception of these notifications: -- textDocument/didOpen: Launch the file worker, create the associated watchdog state and launch a task to - asynchronously receive LSP packets from the worker (e.g. request responses). +- textDocument/didOpen: Launch the file worker, create the associated watchdog state and launch a + task to asynchronously receive LSP packets from the worker (e.g. request + responses). - textDocument/didChange: Update the local file state so that it can be resent to restarted workers. Then forward the `didChange` notification. -- textDocument/didClose: Signal a shutdown to the file worker and remove the associated watchdog state. +- textDocument/didClose: Signal a shutdown to the file worker and remove the associated watchdog + state. Moreover, we don't implement the full protocol at this level: -- Upon starting, the `initialize` request is forwarded to the worker, but it must not respond with its server - capabilities. Consequently, the watchdog will not send an `initialized` notification to the worker. -- After `initialize`, the watchdog sends the corresponding `didOpen` notification with the full current state of - the file. No additional `didOpen` notifications will be forwarded to the worker process. +- Upon starting, the `initialize` request is forwarded to the worker, but it must not respond with + its server capabilities. Consequently, the watchdog will not send an `initialized` notification to + the worker. +- After `initialize`, the watchdog sends the corresponding `didOpen` notification with the full + current state of the file. No additional `didOpen` notifications will be forwarded to the worker + process. - `$/cancelRequest` notifications are forwarded to all file workers. -- File workers are always terminated with an `exit` notification, without previously receiving a `shutdown` request. - Similarly, they never receive a `didClose` notification. +- File workers are always terminated with an `exit` notification, without previously receiving a + `shutdown` request. Similarly, they never receive a `didClose` notification. ## Watchdog <-> client communication -The watchdog itself should implement the LSP standard as closely as possible. However we reserve the right to add -non-standard extensions in case they're needed, for example to communicate tactic state. +The watchdog itself should implement the LSP standard as closely as possible. However we reserve the +right to add non-standard extensions in case they're needed, for example to communicate tactic +state. -/ namespace Lean.Server.Watchdog @@ -83,13 +90,13 @@ section Utils | ioError (e : IO.Error) inductive WorkerState where - /-- The watchdog can detect a crashed file worker in two places: When trying to send a message to the file worker - and when reading a request reply. - In the latter case, the forwarding task terminates and delegates a `crashed` event to the main task. - Then, in both cases, the file worker has its state set to `crashed` and requests that are in-flight are errored. - Upon receiving the next packet for that file worker, the file worker is restarted and the packet is forwarded - to it. If the crash was detected while writing a packet, we queue that packet until the next packet for the file - worker arrives. -/ + /-- The watchdog can detect a crashed file worker in two places: When trying to send a message + to the file worker and when reading a request reply. + In the latter case, the forwarding task terminates and delegates a `crashed` event to the + main task. Then, in both cases, the file worker has its state set to `crashed` and requests + that are in-flight are errored. Upon receiving the next packet for that file worker, the file + worker is restarted and the packet is forwarded to it. If the crash was detected while writing + a packet, we queue that packet until the next packet for the file worker arrives. -/ | crashed (queuedMsgs : Array JsonRpc.Message) | running @@ -102,11 +109,12 @@ section FileWorker proc : Process.Child workerCfg commTask : Task WorkerEvent state : WorkerState - -- This should not be mutated outside of namespace FileWorker, as it is used as shared mutable state - /-- The pending requests map contains all requests - that have been received from the LSP client, but were not answered yet. - We need them for forwarding cancellation requests to the correct worker as well as cleanly aborting - requests on worker crashes. -/ + -- This should not be mutated outside of namespace FileWorker, + -- as it is used as shared mutable state + /-- The pending requests map contains all requests that have been received from the LSP client, + but were not answered yet. + We need them for forwarding cancellation requests to the correct worker as well as cleanly + aborting requests on worker crashes. -/ pendingRequestsRef : IO.Ref PendingRequestMap namespace FileWorker @@ -120,8 +128,10 @@ section FileWorker def erasePendingRequest (fw : FileWorker) (id : RequestID) : IO Unit := fw.pendingRequestsRef.modify fun pendingRequests => pendingRequests.erase id - def errorPendingRequests (fw : FileWorker) (hError : FS.Stream) (code : ErrorCode) (msg : String) : IO Unit := do - let pendingRequests ← fw.pendingRequestsRef.modifyGet (fun pendingRequests => (pendingRequests, RBMap.empty)) + def errorPendingRequests (fw : FileWorker) (hError : FS.Stream) (code : ErrorCode) (msg : String) + : IO Unit := do + let pendingRequests ← fw.pendingRequestsRef.modifyGet + fun pendingRequests => (pendingRequests, RBMap.empty) for ⟨id, _⟩ in pendingRequests do hError.writeLspResponseError { id := id, code := code, message := msg } @@ -173,13 +183,15 @@ section ServerM let s ← read if let some path := fileUriToPath? fw.doc.uri then if let some module ← searchModuleNameOfFileName path s.srcSearchPath then - s.references.modify fun refs => refs.updateWorkerRefs module params.version params.references + s.references.modify fun refs => + refs.updateWorkerRefs module params.version params.references def handleIleanInfoFinal (fw : FileWorker) (params : LeanIleanInfoParams) : ServerM Unit := do let s ← read if let some path := fileUriToPath? fw.doc.uri then if let some module ← searchModuleNameOfFileName path s.srcSearchPath then - s.references.modify fun refs => refs.finalizeWorkerRefs module params.version params.references + s.references.modify fun refs => + refs.finalizeWorkerRefs module params.version params.references /-- Creates a Task which forwards a worker's messages into the output stream until an event which must be handled in the main watchdog thread (e.g. an I/O error) happens. -/ @@ -217,8 +229,9 @@ section ServerM | 0 => -- Worker was terminated fw.errorPendingRequests o ErrorCode.contentModified - (s!"The file worker for {fw.doc.uri} has been terminated. Either the header has changed," - ++ " or the file was closed, or the server is shutting down.") + (s!"The file worker for {fw.doc.uri} has been terminated. " + ++ "Either the header has changed, or the file was closed, " + ++ " or the server is shutting down.") -- one last message to clear the diagnostics for this file so that stale errors -- do not remain in the editor forever. publishDiagnostics fw.doc #[] o @@ -227,8 +240,13 @@ section ServerM return .importsChanged | _ => -- Worker crashed - fw.errorPendingRequests o (if exitCode = 1 then ErrorCode.workerExited else ErrorCode.workerCrashed) - s!"Server process for {fw.doc.uri} crashed, {if exitCode = 1 then "see stderr for exception" else "likely due to a stack overflow or a bug"}." + let (errorCode, errorCausePointer) := + if exitCode = 1 then + (ErrorCode.workerExited, "see stderr for exception") + else + (ErrorCode.workerCrashed, "likely due to a stack overflow or a bug") + fw.errorPendingRequests o errorCode + s!"Server process for {fw.doc.uri} crashed, {errorCausePointer}." publishProgressAtPos fw.doc 0 o (kind := LeanFileProgressKind.fatalError) return WorkerEvent.crashed err loop @@ -300,15 +318,22 @@ section ServerM updateFileWorkers { ←findFileWorker! uri with state := WorkerState.crashed queuedMsgs } /-- Tries to write a message, sets the state of the FileWorker to `crashed` if it does not succeed - and restarts the file worker if the `crashed` flag was already set. Just logs an error if there - is no FileWorker at this `uri`. + and restarts the file worker if the `crashed` flag was already set. Just logs an error if + there is no FileWorker at this `uri`. Messages that couldn't be sent can be queued up via the queueFailedMessage flag and will be discharged after the FileWorker is restarted. -/ - def tryWriteMessage (uri : DocumentUri) (msg : JsonRpc.Message) (queueFailedMessage := true) (restartCrashedWorker := false) : - ServerM Unit := do + def tryWriteMessage + (uri : DocumentUri) + (msg : JsonRpc.Message) + (queueFailedMessage := true) + (restartCrashedWorker := false) + : ServerM Unit := do let some fw ← findFileWorker? uri | do - (←read).hLog.putStrLn s!"Cannot send message to unknown document '{uri}':\n{(toJson msg).compress}" + let errorMsg := + s!"Cannot send message to unknown document '{uri}':\n" + ++ s!"{(toJson msg).compress}" + (←read).hLog.putStrLn errorMsg return match fw.state with | WorkerState.crashed queuedMsgs => @@ -364,7 +389,8 @@ def handleReference (p : ReferenceParams) : ServerM (Array Location) := do if let some module ← searchModuleNameOfFileName path srcSearchPath then let references ← (← read).references.get for ident in references.findAt module p.position (includeStop := true) do - let identRefs ← references.referringTo module ident srcSearchPath p.context.includeDeclaration + let identRefs ← references.referringTo module ident srcSearchPath + p.context.includeDeclaration result := result.append identRefs return result @@ -436,7 +462,8 @@ section NotificationHandling let newDocText := foldDocumentChanges changes oldDoc.text let newDoc : DocumentMeta := ⟨doc.uri, newVersion, newDocText, oldDoc.dependencyBuildMode⟩ updateFileWorkers { fw with doc := newDoc } - tryWriteMessage doc.uri (Notification.mk "textDocument/didChange" p) (restartCrashedWorker := true) + let notification := Notification.mk "textDocument/didChange" p + tryWriteMessage doc.uri notification (restartCrashedWorker := true) def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit := terminateFileWorker p.textDocument.uri @@ -469,15 +496,18 @@ section NotificationHandling if (← fw.pendingRequestsRef.get).contains p.id then tryWriteMessage uri (Notification.mk "$/cancelRequest" p) (queueFailedMessage := false) - def forwardNotification {α : Type} [ToJson α] [FileSource α] (method : String) (params : α) : ServerM Unit := + def forwardNotification {α : Type} [ToJson α] [FileSource α] (method : String) (params : α) + : ServerM Unit := tryWriteMessage (fileSource params) (Notification.mk method params) (queueFailedMessage := true) end NotificationHandling section MessageHandling def parseParams (paramType : Type) [FromJson paramType] (params : Json) : ServerM paramType := match fromJson? params with - | Except.ok parsed => pure parsed - | Except.error inner => throwServerError s!"Got param with wrong structure: {params.compress}\n{inner}" + | Except.ok parsed => + pure parsed + | Except.error inner => + throwServerError s!"Got param with wrong structure: {params.compress}\n{inner}" def forwardRequestToWorker (id : RequestID) (method : String) (params : Json) : ServerM Unit := do let uri: DocumentUri ← @@ -532,25 +562,40 @@ section MessageHandling (← read).hOut.writeLspResponse ⟨id, definitions⟩ return match method with - | "textDocument/references" => handle ReferenceParams (Array Location) handleReference - | "workspace/symbol" => handle WorkspaceSymbolParams (Array SymbolInformation) handleWorkspaceSymbol - | "textDocument/prepareRename" => handle PrepareRenameParams (Option Range) handlePrepareRename - | "textDocument/rename" => handle RenameParams WorkspaceEdit handleRename - | _ => forwardRequestToWorker id method params + | "textDocument/references" => + handle ReferenceParams (Array Location) handleReference + | "workspace/symbol" => + handle WorkspaceSymbolParams (Array SymbolInformation) handleWorkspaceSymbol + | "textDocument/prepareRename" => + handle PrepareRenameParams (Option Range) handlePrepareRename + | "textDocument/rename" => + handle RenameParams WorkspaceEdit handleRename + | _ => + forwardRequestToWorker id method params def handleNotification (method : String) (params : Json) : ServerM Unit := do - let handle := (fun α [FromJson α] (handler : α → ServerM Unit) => parseParams α params >>= handler) + let handle := fun α [FromJson α] (handler : α → ServerM Unit) => + parseParams α params >>= handler match method with - | "textDocument/didOpen" => handle _ handleDidOpen - | "textDocument/didChange" => handle DidChangeTextDocumentParams handleDidChange - | "textDocument/didClose" => handle DidCloseTextDocumentParams handleDidClose - | "workspace/didChangeWatchedFiles" => handle DidChangeWatchedFilesParams handleDidChangeWatchedFiles - | "$/cancelRequest" => handle CancelParams handleCancelRequest - | "$/lean/rpc/connect" => handle RpcConnectParams (forwardNotification method) - | "$/lean/rpc/release" => handle RpcReleaseParams (forwardNotification method) - | "$/lean/rpc/keepAlive" => handle RpcKeepAliveParams (forwardNotification method) - | _ => - if !"$/".isPrefixOf method then -- implementation-dependent notifications can be safely ignored + | "textDocument/didOpen" => + handle _ handleDidOpen + | "textDocument/didChange" => + handle DidChangeTextDocumentParams handleDidChange + | "textDocument/didClose" => + handle DidCloseTextDocumentParams handleDidClose + | "workspace/didChangeWatchedFiles" => + handle DidChangeWatchedFilesParams handleDidChangeWatchedFiles + | "$/cancelRequest" => + handle CancelParams handleCancelRequest + | "$/lean/rpc/connect" => + handle RpcConnectParams (forwardNotification method) + | "$/lean/rpc/release" => + handle RpcReleaseParams (forwardNotification method) + | "$/lean/rpc/keepAlive" => + handle RpcKeepAliveParams (forwardNotification method) + | _ => + -- implementation-dependent notifications can be safely ignored + if !"$/".isPrefixOf method then (←read).hLog.putStrLn s!"Got unsupported notification: {method}" end MessageHandling @@ -614,7 +659,8 @@ section MainLoop handleCrash fw.doc.uri #[] mainLoop clientTask | WorkerEvent.terminated => - throwServerError "Internal server error: got termination event for worker that should have been removed" + throwServerError <| "Internal server error: got termination event for worker that " + ++ "should have been removed" | .importsChanged => startFileWorker fw.doc mainLoop clientTask @@ -677,7 +723,8 @@ def initAndRunWatchdogAux : ServerM Unit := do def findWorkerPath : IO System.FilePath := do let mut workerPath ← IO.appPath if let some path := (←IO.getEnv "LEAN_SYSROOT") then - workerPath := System.FilePath.mk path / "bin" / "lean" |>.withExtension System.FilePath.exeExtension + workerPath := System.FilePath.mk path / "bin" / "lean" + |>.withExtension System.FilePath.exeExtension if let some path := (←IO.getEnv "LEAN_WORKER_PATH") then workerPath := System.FilePath.mk path return workerPath