Skip to content

Commit

Permalink
refactor: enforce column limit in watchdog
Browse files Browse the repository at this point in the history
  • Loading branch information
mhuisi committed Jan 24, 2024
1 parent ec39de8 commit 7a228e9
Showing 1 changed file with 116 additions and 69 deletions.
185 changes: 116 additions & 69 deletions src/Lean/Server/Watchdog.lean
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,49 @@ For general server architecture, see `README.md`. This module implements the wat
## Watchdog state
Most LSP clients only send us file diffs, so to facilitate sending entire file contents to freshly restarted
workers, the watchdog needs to maintain the current state of each file. It can also use this state to detect changes
to the header and thus restart the corresponding worker, freeing its imports.
Most LSP clients only send us file diffs, so to facilitate sending entire file contents to freshly
restarted workers, the watchdog needs to maintain the current state of each file. It can also use
this state to detect changes to the header and thus restart the corresponding worker, freeing its
imports.
TODO(WN):
We may eventually want to keep track of approximately (since this isn't knowable exactly) where in the file a worker
crashed. Then on restart, we tell said worker to only parse up to that point and query the user about how to proceed
(continue OR allow the user to fix the bug and then continue OR ..). Without this, if the crash is deterministic,
users may be confused about why the server seemingly stopped working for a single file.
We may eventually want to keep track of approximately (since this isn't knowable exactly) where in
the file a worker crashed. Then on restart, we tell said worker to only parse up to that point and
query the user about how to proceed (continue OR allow the user to fix the bug and then continue OR
..). Without this, if the crash is deterministic, users may be confused about why the server
seemingly stopped working for a single file.
## Watchdog <-> worker communication
The watchdog process and its file worker processes communicate via LSP. If the necessity arises,
we might add non-standard commands similarly based on JSON-RPC. Most requests and notifications
are forwarded to the corresponding file worker process, with the exception of these notifications:
The watchdog process and its file worker processes communicate via LSP. If the necessity arises, we
might add non-standard commands similarly based on JSON-RPC. Most requests and notifications are
forwarded to the corresponding file worker process, with the exception of these notifications:
- textDocument/didOpen: Launch the file worker, create the associated watchdog state and launch a task to
asynchronously receive LSP packets from the worker (e.g. request responses).
- textDocument/didOpen: Launch the file worker, create the associated watchdog state and launch a
task to asynchronously receive LSP packets from the worker (e.g. request
responses).
- textDocument/didChange: Update the local file state so that it can be resent to restarted workers.
Then forward the `didChange` notification.
- textDocument/didClose: Signal a shutdown to the file worker and remove the associated watchdog state.
- textDocument/didClose: Signal a shutdown to the file worker and remove the associated watchdog
state.
Moreover, we don't implement the full protocol at this level:
- Upon starting, the `initialize` request is forwarded to the worker, but it must not respond with its server
capabilities. Consequently, the watchdog will not send an `initialized` notification to the worker.
- After `initialize`, the watchdog sends the corresponding `didOpen` notification with the full current state of
the file. No additional `didOpen` notifications will be forwarded to the worker process.
- Upon starting, the `initialize` request is forwarded to the worker, but it must not respond with
its server capabilities. Consequently, the watchdog will not send an `initialized` notification to
the worker.
- After `initialize`, the watchdog sends the corresponding `didOpen` notification with the full
current state of the file. No additional `didOpen` notifications will be forwarded to the worker
process.
- `$/cancelRequest` notifications are forwarded to all file workers.
- File workers are always terminated with an `exit` notification, without previously receiving a `shutdown` request.
Similarly, they never receive a `didClose` notification.
- File workers are always terminated with an `exit` notification, without previously receiving a
`shutdown` request. Similarly, they never receive a `didClose` notification.
## Watchdog <-> client communication
The watchdog itself should implement the LSP standard as closely as possible. However we reserve the right to add
non-standard extensions in case they're needed, for example to communicate tactic state.
The watchdog itself should implement the LSP standard as closely as possible. However we reserve the
right to add non-standard extensions in case they're needed, for example to communicate tactic
state.
-/

namespace Lean.Server.Watchdog
Expand All @@ -83,13 +90,13 @@ section Utils
| ioError (e : IO.Error)

inductive WorkerState where
/-- The watchdog can detect a crashed file worker in two places: When trying to send a message to the file worker
and when reading a request reply.
In the latter case, the forwarding task terminates and delegates a `crashed` event to the main task.
Then, in both cases, the file worker has its state set to `crashed` and requests that are in-flight are errored.
Upon receiving the next packet for that file worker, the file worker is restarted and the packet is forwarded
to it. If the crash was detected while writing a packet, we queue that packet until the next packet for the file
worker arrives. -/
/-- The watchdog can detect a crashed file worker in two places: When trying to send a message
to the file worker and when reading a request reply.
In the latter case, the forwarding task terminates and delegates a `crashed` event to the
main task. Then, in both cases, the file worker has its state set to `crashed` and requests
that are in-flight are errored. Upon receiving the next packet for that file worker, the file
worker is restarted and the packet is forwarded to it. If the crash was detected while writing
a packet, we queue that packet until the next packet for the file worker arrives. -/
| crashed (queuedMsgs : Array JsonRpc.Message)
| running

Expand All @@ -102,11 +109,12 @@ section FileWorker
proc : Process.Child workerCfg
commTask : Task WorkerEvent
state : WorkerState
-- This should not be mutated outside of namespace FileWorker, as it is used as shared mutable state
/-- The pending requests map contains all requests
that have been received from the LSP client, but were not answered yet.
We need them for forwarding cancellation requests to the correct worker as well as cleanly aborting
requests on worker crashes. -/
-- This should not be mutated outside of namespace FileWorker,
-- as it is used as shared mutable state
/-- The pending requests map contains all requests that have been received from the LSP client,
but were not answered yet.
We need them for forwarding cancellation requests to the correct worker as well as cleanly
aborting requests on worker crashes. -/
pendingRequestsRef : IO.Ref PendingRequestMap

namespace FileWorker
Expand All @@ -120,8 +128,10 @@ section FileWorker
def erasePendingRequest (fw : FileWorker) (id : RequestID) : IO Unit :=
fw.pendingRequestsRef.modify fun pendingRequests => pendingRequests.erase id

def errorPendingRequests (fw : FileWorker) (hError : FS.Stream) (code : ErrorCode) (msg : String) : IO Unit := do
let pendingRequests ← fw.pendingRequestsRef.modifyGet (fun pendingRequests => (pendingRequests, RBMap.empty))
def errorPendingRequests (fw : FileWorker) (hError : FS.Stream) (code : ErrorCode) (msg : String)
: IO Unit := do
let pendingRequests ← fw.pendingRequestsRef.modifyGet
fun pendingRequests => (pendingRequests, RBMap.empty)
for ⟨id, _⟩ in pendingRequests do
hError.writeLspResponseError { id := id, code := code, message := msg }

Expand Down Expand Up @@ -173,13 +183,15 @@ section ServerM
let s ← read
if let some path := fileUriToPath? fw.doc.uri then
if let some module ← searchModuleNameOfFileName path s.srcSearchPath then
s.references.modify fun refs => refs.updateWorkerRefs module params.version params.references
s.references.modify fun refs =>
refs.updateWorkerRefs module params.version params.references

def handleIleanInfoFinal (fw : FileWorker) (params : LeanIleanInfoParams) : ServerM Unit := do
let s ← read
if let some path := fileUriToPath? fw.doc.uri then
if let some module ← searchModuleNameOfFileName path s.srcSearchPath then
s.references.modify fun refs => refs.finalizeWorkerRefs module params.version params.references
s.references.modify fun refs =>
refs.finalizeWorkerRefs module params.version params.references

/-- Creates a Task which forwards a worker's messages into the output stream until an event
which must be handled in the main watchdog thread (e.g. an I/O error) happens. -/
Expand Down Expand Up @@ -217,8 +229,9 @@ section ServerM
| 0 =>
-- Worker was terminated
fw.errorPendingRequests o ErrorCode.contentModified
(s!"The file worker for {fw.doc.uri} has been terminated. Either the header has changed,"
++ " or the file was closed, or the server is shutting down.")
(s!"The file worker for {fw.doc.uri} has been terminated. "
++ "Either the header has changed, or the file was closed, "
++ " or the server is shutting down.")
-- one last message to clear the diagnostics for this file so that stale errors
-- do not remain in the editor forever.
publishDiagnostics fw.doc #[] o
Expand All @@ -227,8 +240,13 @@ section ServerM
return .importsChanged
| _ =>
-- Worker crashed
fw.errorPendingRequests o (if exitCode = 1 then ErrorCode.workerExited else ErrorCode.workerCrashed)
s!"Server process for {fw.doc.uri} crashed, {if exitCode = 1 then "see stderr for exception" else "likely due to a stack overflow or a bug"}."
let (errorCode, errorCausePointer) :=
if exitCode = 1 then
(ErrorCode.workerExited, "see stderr for exception")
else
(ErrorCode.workerCrashed, "likely due to a stack overflow or a bug")
fw.errorPendingRequests o errorCode
s!"Server process for {fw.doc.uri} crashed, {errorCausePointer}."
publishProgressAtPos fw.doc 0 o (kind := LeanFileProgressKind.fatalError)
return WorkerEvent.crashed err
loop
Expand Down Expand Up @@ -300,15 +318,22 @@ section ServerM
updateFileWorkers { ←findFileWorker! uri with state := WorkerState.crashed queuedMsgs }

/-- Tries to write a message, sets the state of the FileWorker to `crashed` if it does not succeed
and restarts the file worker if the `crashed` flag was already set. Just logs an error if there
is no FileWorker at this `uri`.
and restarts the file worker if the `crashed` flag was already set. Just logs an error if
there is no FileWorker at this `uri`.
Messages that couldn't be sent can be queued up via the queueFailedMessage flag and
will be discharged after the FileWorker is restarted. -/
def tryWriteMessage (uri : DocumentUri) (msg : JsonRpc.Message) (queueFailedMessage := true) (restartCrashedWorker := false) :
ServerM Unit := do
def tryWriteMessage
(uri : DocumentUri)
(msg : JsonRpc.Message)
(queueFailedMessage := true)
(restartCrashedWorker := false)
: ServerM Unit := do
let some fw ← findFileWorker? uri
| do
(←read).hLog.putStrLn s!"Cannot send message to unknown document '{uri}':\n{(toJson msg).compress}"
let errorMsg :=
s!"Cannot send message to unknown document '{uri}':\n"
++ s!"{(toJson msg).compress}"
(←read).hLog.putStrLn errorMsg
return
match fw.state with
| WorkerState.crashed queuedMsgs =>
Expand Down Expand Up @@ -364,7 +389,8 @@ def handleReference (p : ReferenceParams) : ServerM (Array Location) := do
if let some module ← searchModuleNameOfFileName path srcSearchPath then
let references ← (← read).references.get
for ident in references.findAt module p.position (includeStop := true) do
let identRefs ← references.referringTo module ident srcSearchPath p.context.includeDeclaration
let identRefs ← references.referringTo module ident srcSearchPath
p.context.includeDeclaration
result := result.append identRefs
return result

Expand Down Expand Up @@ -436,7 +462,8 @@ section NotificationHandling
let newDocText := foldDocumentChanges changes oldDoc.text
let newDoc : DocumentMeta := ⟨doc.uri, newVersion, newDocText, oldDoc.dependencyBuildMode⟩
updateFileWorkers { fw with doc := newDoc }
tryWriteMessage doc.uri (Notification.mk "textDocument/didChange" p) (restartCrashedWorker := true)
let notification := Notification.mk "textDocument/didChange" p
tryWriteMessage doc.uri notification (restartCrashedWorker := true)

def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit :=
terminateFileWorker p.textDocument.uri
Expand Down Expand Up @@ -469,15 +496,18 @@ section NotificationHandling
if (← fw.pendingRequestsRef.get).contains p.id then
tryWriteMessage uri (Notification.mk "$/cancelRequest" p) (queueFailedMessage := false)

def forwardNotification {α : Type} [ToJson α] [FileSource α] (method : String) (params : α) : ServerM Unit :=
def forwardNotification {α : Type} [ToJson α] [FileSource α] (method : String) (params : α)
: ServerM Unit :=
tryWriteMessage (fileSource params) (Notification.mk method params) (queueFailedMessage := true)
end NotificationHandling

section MessageHandling
def parseParams (paramType : Type) [FromJson paramType] (params : Json) : ServerM paramType :=
match fromJson? params with
| Except.ok parsed => pure parsed
| Except.error inner => throwServerError s!"Got param with wrong structure: {params.compress}\n{inner}"
| Except.ok parsed =>
pure parsed
| Except.error inner =>
throwServerError s!"Got param with wrong structure: {params.compress}\n{inner}"

def forwardRequestToWorker (id : RequestID) (method : String) (params : Json) : ServerM Unit := do
let uri: DocumentUri ←
Expand Down Expand Up @@ -532,25 +562,40 @@ section MessageHandling
(← read).hOut.writeLspResponse ⟨id, definitions⟩
return
match method with
| "textDocument/references" => handle ReferenceParams (Array Location) handleReference
| "workspace/symbol" => handle WorkspaceSymbolParams (Array SymbolInformation) handleWorkspaceSymbol
| "textDocument/prepareRename" => handle PrepareRenameParams (Option Range) handlePrepareRename
| "textDocument/rename" => handle RenameParams WorkspaceEdit handleRename
| _ => forwardRequestToWorker id method params
| "textDocument/references" =>
handle ReferenceParams (Array Location) handleReference
| "workspace/symbol" =>
handle WorkspaceSymbolParams (Array SymbolInformation) handleWorkspaceSymbol
| "textDocument/prepareRename" =>
handle PrepareRenameParams (Option Range) handlePrepareRename
| "textDocument/rename" =>
handle RenameParams WorkspaceEdit handleRename
| _ =>
forwardRequestToWorker id method params

def handleNotification (method : String) (params : Json) : ServerM Unit := do
let handle := (fun α [FromJson α] (handler : α → ServerM Unit) => parseParams α params >>= handler)
let handle := fun α [FromJson α] (handler : α → ServerM Unit) =>
parseParams α params >>= handler
match method with
| "textDocument/didOpen" => handle _ handleDidOpen
| "textDocument/didChange" => handle DidChangeTextDocumentParams handleDidChange
| "textDocument/didClose" => handle DidCloseTextDocumentParams handleDidClose
| "workspace/didChangeWatchedFiles" => handle DidChangeWatchedFilesParams handleDidChangeWatchedFiles
| "$/cancelRequest" => handle CancelParams handleCancelRequest
| "$/lean/rpc/connect" => handle RpcConnectParams (forwardNotification method)
| "$/lean/rpc/release" => handle RpcReleaseParams (forwardNotification method)
| "$/lean/rpc/keepAlive" => handle RpcKeepAliveParams (forwardNotification method)
| _ =>
if !"$/".isPrefixOf method then -- implementation-dependent notifications can be safely ignored
| "textDocument/didOpen" =>
handle _ handleDidOpen
| "textDocument/didChange" =>
handle DidChangeTextDocumentParams handleDidChange
| "textDocument/didClose" =>
handle DidCloseTextDocumentParams handleDidClose
| "workspace/didChangeWatchedFiles" =>
handle DidChangeWatchedFilesParams handleDidChangeWatchedFiles
| "$/cancelRequest" =>
handle CancelParams handleCancelRequest
| "$/lean/rpc/connect" =>
handle RpcConnectParams (forwardNotification method)
| "$/lean/rpc/release" =>
handle RpcReleaseParams (forwardNotification method)
| "$/lean/rpc/keepAlive" =>
handle RpcKeepAliveParams (forwardNotification method)
| _ =>
-- implementation-dependent notifications can be safely ignored
if !"$/".isPrefixOf method then
(←read).hLog.putStrLn s!"Got unsupported notification: {method}"
end MessageHandling

Expand Down Expand Up @@ -614,7 +659,8 @@ section MainLoop
handleCrash fw.doc.uri #[]
mainLoop clientTask
| WorkerEvent.terminated =>
throwServerError "Internal server error: got termination event for worker that should have been removed"
throwServerError <| "Internal server error: got termination event for worker that "
++ "should have been removed"
| .importsChanged =>
startFileWorker fw.doc
mainLoop clientTask
Expand Down Expand Up @@ -677,7 +723,8 @@ def initAndRunWatchdogAux : ServerM Unit := do
def findWorkerPath : IO System.FilePath := do
let mut workerPath ← IO.appPath
if let some path := (←IO.getEnv "LEAN_SYSROOT") then
workerPath := System.FilePath.mk path / "bin" / "lean" |>.addExtension System.FilePath.exeExtension
workerPath := System.FilePath.mk path / "bin" / "lean"
|>.addExtension System.FilePath.exeExtension
if let some path := (←IO.getEnv "LEAN_WORKER_PATH") then
workerPath := System.FilePath.mk path
return workerPath
Expand Down

0 comments on commit 7a228e9

Please sign in to comment.