Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change in kernel launcher handshake #14559

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 80 additions & 23 deletions src/kernels/raw/launcher/kernelProcess.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ import {
import { IKernelConnection, IKernelProcess } from '../types';
import { KernelEnvironmentVariablesService } from './kernelEnvVarsService.node';
import { IPythonExtensionChecker } from '../../../platform/api/types';
import { Cancellation, isCancellationError, raceCancellationError } from '../../../platform/common/cancellation';
import {
Cancellation,
isCancellationError,
raceCancellation,
raceCancellationError
} from '../../../platform/common/cancellation';
import {
getTelemetrySafeErrorMessageFromPythonTraceback,
getErrorMessageFromPythonTraceback
Expand All @@ -38,7 +43,7 @@ import {
import { IFileSystemNode } from '../../../platform/common/platform/types.node';
import { IProcessServiceFactory, ObservableExecutionResult } from '../../../platform/common/process/types.node';
import { Resource, IOutputChannel, IJupyterSettings } from '../../../platform/common/types';
import { createDeferred, raceTimeout } from '../../../platform/common/utils/async';
import { createDeferred, raceTimeout, waitForCondition } from '../../../platform/common/utils/async';
import { DataScience } from '../../../platform/common/utils/localize';
import { noop, swallowExceptions } from '../../../platform/common/utils/misc';
import { KernelDiedError } from '../../errors/kernelDiedError';
Expand Down Expand Up @@ -257,25 +262,40 @@ export class KernelProcess implements IKernelProcess {
if (deferred.rejected) {
await deferred.promise;
}
const tcpPortUsed = (await import('tcp-port-used')).default;
// Wait on shell port as this is used for communications (hence shell port is guaranteed to be used, where as heart beat isn't).
// Wait for shell & iopub to be used (iopub is where we get a response & this is similar to what Jupyter does today).
// Kernel must be connected to bo Shell & IoPub channels for kernel communication to work.
const portsUsed = Promise.all([
tcpPortUsed.waitUntilUsed(this.connection.shell_port, 200, timeout),
tcpPortUsed.waitUntilUsed(this.connection.iopub_port, 200, timeout)
]).catch((ex) => {
if (cancelToken.isCancellationRequested || deferred.rejected) {
return;

if (
this.isPythonKernel &&
this.extensionChecker.isPythonExtensionInstalled &&
this._kernelConnectionMetadata.interpreter
) {
const success = await this.waitForConnectionFileToGetUpdatedWithPorts(cancelToken, timeout);
if (cancelToken.isCancellationRequested) {
throw new CancellationError();
}
traceError(`waitUntilUsed timed out`, ex);
// Throw an error we recognize.
return Promise.reject(new KernelPortNotUsedTimeoutError(this.kernelConnectionMetadata));
});
await raceCancellationError(cancelToken, portsUsed, deferred.promise);
if (!success) {
throw new KernelPortNotUsedTimeoutError(this.kernelConnectionMetadata);
}
} else {
const tcpPortUsed = (await import('tcp-port-used')).default;
// Wait on shell port as this is used for communications (hence shell port is guaranteed to be used, where as heart beat isn't).
// Wait for shell & iopub to be used (iopub is where we get a response & this is similar to what Jupyter does today).
// Kernel must be connected to bo Shell & IoPub channels for kernel communication to work.
const portsUsed = Promise.all([
tcpPortUsed.waitUntilUsed(this.connection.shell_port, 200, timeout),
tcpPortUsed.waitUntilUsed(this.connection.iopub_port, 200, timeout)
]).catch((ex) => {
if (cancelToken.isCancellationRequested || deferred.rejected) {
return;
}
traceError(`waitUntilUsed timed out`, ex);
// Throw an error we recognize.
return Promise.reject(new KernelPortNotUsedTimeoutError(this.kernelConnectionMetadata));
});
await raceCancellationError(cancelToken, portsUsed, deferred.promise);
}
} catch (e) {
const stdErrToLog = (stderrProc || stderr || '').trim();
if (!cancelToken?.isCancellationRequested && !isCancellationError(e)) {
if (!cancelToken.isCancellationRequested && !isCancellationError(e)) {
traceError('Disposing kernel process due to an error', e);
if (e && e instanceof Error && stdErrToLog.length && e.message.includes(stdErrToLog)) {
// No need to log the stderr as it's already part of the error message.
Expand All @@ -286,11 +306,11 @@ export class KernelProcess implements IKernelProcess {
// Make sure to dispose if we never connect.
await this.dispose();

if (!cancelToken?.isCancellationRequested && e instanceof BaseError) {
if (!cancelToken.isCancellationRequested && e instanceof BaseError) {
throw e;
} else {
// Possible this isn't an error we recognize, hence wrap it in a user friendly message.
if (cancelToken?.isCancellationRequested) {
if (cancelToken.isCancellationRequested) {
traceVerbose('User cancelled the kernel launch');
}
// If we have the python error message in std outputs, display that.
Expand All @@ -308,6 +328,38 @@ export class KernelProcess implements IKernelProcess {
}
}

private async waitForConnectionFileToGetUpdatedWithPorts(token: CancellationToken, timeout: number) {
const checkIfPortsAreNonEmpty = async () => {
if (token.isCancellationRequested) {
return true;
}
try {
const json: typeof this._connection = JSON.parse(await this.fileSystem.readFile(this.connectionFile!));
if (json.control_port > 0 && json.hb_port > 0 && json.iopub_port > 0 && json.shell_port > 0) {
this._connection.control_port = json.control_port;
this._connection.hb_port = json.hb_port;
this._connection.iopub_port = json.iopub_port;
this._connection.shell_port = json.shell_port;
this._connection.stdin_port = json.stdin_port;
return true;
}
} catch {
// File not created.
return false;
}
return false;
};

return raceCancellation(
token,
waitForCondition(
() => checkIfPortsAreNonEmpty().then((result) => !token.isCancellationRequested && result),
timeout,
100
)
);
}

public async dispose(): Promise<void> {
if (this._disposingPromise) {
return this._disposingPromise;
Expand Down Expand Up @@ -424,17 +476,22 @@ export class KernelProcess implements IKernelProcess {
}

this.connectionFile = await this.createConnectionFile();
const connectionInfo = Object.assign({}, this._connection);
connectionInfo.control_port = 0;
connectionInfo.hb_port = 0;
connectionInfo.iopub_port = 0;
connectionInfo.shell_port = 0;
connectionInfo.stdin_port = 0;
await this.fileSystem.writeFile(this.connectionFile, JSON.stringify(this._connection));

// Python kernels are special. Handle the extra arguments.
if (this.isPythonKernel) {
// Slice out -f and the connection file from the args
this.launchKernelSpec.argv.splice(indexOfConnectionFile - 1, 2);

// Add in our connection command line args
this.launchKernelSpec.argv.push(...this.addPythonConnectionArgs(this.connectionFile));
await this.fileSystem.writeFile(this.connectionFile, JSON.stringify(this._connection));
} else {
await this.fileSystem.writeFile(this.connectionFile, JSON.stringify(this._connection));

// Replace the connection file argument with this file
// Remember, non-python kernels can have argv as `--connection-file={connection_file}`,
// hence we should not replace the entire entry, but just replace the text `{connection_file}`
Expand Down
16 changes: 12 additions & 4 deletions src/platform/common/utils/async.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

import { dispose } from '../helpers';
import { IDisposable } from '../types';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type PromiseFunction = (...any: any[]) => Promise<any>;

Expand All @@ -18,19 +21,24 @@ export async function waitForCondition(
// Set a timer that will resolve with null
return new Promise<boolean>((resolve) => {
let finish: (result: boolean) => void;
const disposables: IDisposable[] = [];
const timer = setTimeout(() => finish(false), timeout);
const intervalId = setInterval(() => {
disposables.push({ dispose: () => clearTimeout(timer) });
const tryCondition = () => {
condition()
.then((r) => {
if (r) {
finish(true);
} else {
const timeout = setTimeout(() => tryCondition(), interval);
disposables.push({ dispose: () => clearTimeout(timeout) });
}
})
.catch((_e) => finish(false));
}, interval);
};
tryCondition();
finish = (result: boolean) => {
clearTimeout(timer);
clearInterval(intervalId);
dispose(disposables);
resolve(result);
};
});
Expand Down
Loading