From b0eabf9697609eab32b14e3e6adcbaf4d089e80f Mon Sep 17 00:00:00 2001 From: Don Jayamanne Date: Wed, 25 Oct 2023 11:31:49 +1100 Subject: [PATCH] change in kernel launcher handshake --- .../raw/launcher/kernelProcess.node.ts | 103 ++++++++++++++---- src/platform/common/utils/async.ts | 16 ++- 2 files changed, 92 insertions(+), 27 deletions(-) diff --git a/src/kernels/raw/launcher/kernelProcess.node.ts b/src/kernels/raw/launcher/kernelProcess.node.ts index 786de815b63..caa1c87326f 100644 --- a/src/kernels/raw/launcher/kernelProcess.node.ts +++ b/src/kernels/raw/launcher/kernelProcess.node.ts @@ -20,7 +20,12 @@ import { import { IKernelConnection, IKernelProcess } from '../types'; import { KernelEnvironmentVariablesService } from './kernelEnvVarsService.node'; import { IPythonExtensionChecker } from '../../../platform/api/types'; -import { Cancellation, isCancellationError, raceCancellationError } from '../../../platform/common/cancellation'; +import { + Cancellation, + isCancellationError, + raceCancellation, + raceCancellationError +} from '../../../platform/common/cancellation'; import { getTelemetrySafeErrorMessageFromPythonTraceback, getErrorMessageFromPythonTraceback @@ -38,7 +43,7 @@ import { import { IFileSystemNode } from '../../../platform/common/platform/types.node'; import { IProcessServiceFactory, ObservableExecutionResult } from '../../../platform/common/process/types.node'; import { Resource, IOutputChannel, IJupyterSettings } from '../../../platform/common/types'; -import { createDeferred, raceTimeout } from '../../../platform/common/utils/async'; +import { createDeferred, raceTimeout, waitForCondition } from '../../../platform/common/utils/async'; import { DataScience } from '../../../platform/common/utils/localize'; import { noop, swallowExceptions } from '../../../platform/common/utils/misc'; import { KernelDiedError } from '../../errors/kernelDiedError'; @@ -257,25 +262,40 @@ export class KernelProcess implements IKernelProcess { if (deferred.rejected) { await deferred.promise; } - const tcpPortUsed = (await import('tcp-port-used')).default; - // Wait on shell port as this is used for communications (hence shell port is guaranteed to be used, where as heart beat isn't). - // Wait for shell & iopub to be used (iopub is where we get a response & this is similar to what Jupyter does today). - // Kernel must be connected to bo Shell & IoPub channels for kernel communication to work. - const portsUsed = Promise.all([ - tcpPortUsed.waitUntilUsed(this.connection.shell_port, 200, timeout), - tcpPortUsed.waitUntilUsed(this.connection.iopub_port, 200, timeout) - ]).catch((ex) => { - if (cancelToken.isCancellationRequested || deferred.rejected) { - return; + + if ( + this.isPythonKernel && + this.extensionChecker.isPythonExtensionInstalled && + this._kernelConnectionMetadata.interpreter + ) { + const success = await this.waitForConnectionFileToGetUpdatedWithPorts(cancelToken, timeout); + if (cancelToken.isCancellationRequested) { + throw new CancellationError(); } - traceError(`waitUntilUsed timed out`, ex); - // Throw an error we recognize. - return Promise.reject(new KernelPortNotUsedTimeoutError(this.kernelConnectionMetadata)); - }); - await raceCancellationError(cancelToken, portsUsed, deferred.promise); + if (!success) { + throw new KernelPortNotUsedTimeoutError(this.kernelConnectionMetadata); + } + } else { + const tcpPortUsed = (await import('tcp-port-used')).default; + // Wait on shell port as this is used for communications (hence shell port is guaranteed to be used, where as heart beat isn't). + // Wait for shell & iopub to be used (iopub is where we get a response & this is similar to what Jupyter does today). + // Kernel must be connected to bo Shell & IoPub channels for kernel communication to work. + const portsUsed = Promise.all([ + tcpPortUsed.waitUntilUsed(this.connection.shell_port, 200, timeout), + tcpPortUsed.waitUntilUsed(this.connection.iopub_port, 200, timeout) + ]).catch((ex) => { + if (cancelToken.isCancellationRequested || deferred.rejected) { + return; + } + traceError(`waitUntilUsed timed out`, ex); + // Throw an error we recognize. + return Promise.reject(new KernelPortNotUsedTimeoutError(this.kernelConnectionMetadata)); + }); + await raceCancellationError(cancelToken, portsUsed, deferred.promise); + } } catch (e) { const stdErrToLog = (stderrProc || stderr || '').trim(); - if (!cancelToken?.isCancellationRequested && !isCancellationError(e)) { + if (!cancelToken.isCancellationRequested && !isCancellationError(e)) { traceError('Disposing kernel process due to an error', e); if (e && e instanceof Error && stdErrToLog.length && e.message.includes(stdErrToLog)) { // No need to log the stderr as it's already part of the error message. @@ -286,11 +306,11 @@ export class KernelProcess implements IKernelProcess { // Make sure to dispose if we never connect. await this.dispose(); - if (!cancelToken?.isCancellationRequested && e instanceof BaseError) { + if (!cancelToken.isCancellationRequested && e instanceof BaseError) { throw e; } else { // Possible this isn't an error we recognize, hence wrap it in a user friendly message. - if (cancelToken?.isCancellationRequested) { + if (cancelToken.isCancellationRequested) { traceVerbose('User cancelled the kernel launch'); } // If we have the python error message in std outputs, display that. @@ -308,6 +328,38 @@ export class KernelProcess implements IKernelProcess { } } + private async waitForConnectionFileToGetUpdatedWithPorts(token: CancellationToken, timeout: number) { + const checkIfPortsAreNonEmpty = async () => { + if (token.isCancellationRequested) { + return true; + } + try { + const json: typeof this._connection = JSON.parse(await this.fileSystem.readFile(this.connectionFile!)); + if (json.control_port > 0 && json.hb_port > 0 && json.iopub_port > 0 && json.shell_port > 0) { + this._connection.control_port = json.control_port; + this._connection.hb_port = json.hb_port; + this._connection.iopub_port = json.iopub_port; + this._connection.shell_port = json.shell_port; + this._connection.stdin_port = json.stdin_port; + return true; + } + } catch { + // File not created. + return false; + } + return false; + }; + + return raceCancellation( + token, + waitForCondition( + () => checkIfPortsAreNonEmpty().then((result) => !token.isCancellationRequested && result), + timeout, + 100 + ) + ); + } + public async dispose(): Promise { if (this._disposingPromise) { return this._disposingPromise; @@ -424,6 +476,14 @@ export class KernelProcess implements IKernelProcess { } this.connectionFile = await this.createConnectionFile(); + const connectionInfo = Object.assign({}, this._connection); + connectionInfo.control_port = 0; + connectionInfo.hb_port = 0; + connectionInfo.iopub_port = 0; + connectionInfo.shell_port = 0; + connectionInfo.stdin_port = 0; + await this.fileSystem.writeFile(this.connectionFile, JSON.stringify(this._connection)); + // Python kernels are special. Handle the extra arguments. if (this.isPythonKernel) { // Slice out -f and the connection file from the args @@ -431,10 +491,7 @@ export class KernelProcess implements IKernelProcess { // Add in our connection command line args this.launchKernelSpec.argv.push(...this.addPythonConnectionArgs(this.connectionFile)); - await this.fileSystem.writeFile(this.connectionFile, JSON.stringify(this._connection)); } else { - await this.fileSystem.writeFile(this.connectionFile, JSON.stringify(this._connection)); - // Replace the connection file argument with this file // Remember, non-python kernels can have argv as `--connection-file={connection_file}`, // hence we should not replace the entire entry, but just replace the text `{connection_file}` diff --git a/src/platform/common/utils/async.ts b/src/platform/common/utils/async.ts index b50ea6d647e..c9abc51859f 100644 --- a/src/platform/common/utils/async.ts +++ b/src/platform/common/utils/async.ts @@ -1,6 +1,9 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +import { dispose } from '../helpers'; +import { IDisposable } from '../types'; + // eslint-disable-next-line @typescript-eslint/no-explicit-any export type PromiseFunction = (...any: any[]) => Promise; @@ -18,19 +21,24 @@ export async function waitForCondition( // Set a timer that will resolve with null return new Promise((resolve) => { let finish: (result: boolean) => void; + const disposables: IDisposable[] = []; const timer = setTimeout(() => finish(false), timeout); - const intervalId = setInterval(() => { + disposables.push({ dispose: () => clearTimeout(timer) }); + const tryCondition = () => { condition() .then((r) => { if (r) { finish(true); + } else { + const timeout = setTimeout(() => tryCondition(), interval); + disposables.push({ dispose: () => clearTimeout(timeout) }); } }) .catch((_e) => finish(false)); - }, interval); + }; + tryCondition(); finish = (result: boolean) => { - clearTimeout(timer); - clearInterval(intervalId); + dispose(disposables); resolve(result); }; });